]> git.ipfire.org Git - thirdparty/apache/httpd.git/blob
1732102
[thirdparty/apache/httpd.git] /
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <stddef.h>
17 #include <apr_strings.h>
18 #include <nghttp2/nghttp2.h>
19
20 #include <mpm_common.h>
21 #include <httpd.h>
22 #include <mod_proxy.h>
23
24 #include "mod_http2.h"
25 #include "h2.h"
26 #include "h2_util.h"
27 #include "h2_proxy_session.h"
28
29 APLOG_USE_MODULE(proxy_http2);
30
31 typedef struct h2_proxy_stream {
32 int id;
33 apr_pool_t *pool;
34 h2_proxy_session *session;
35
36 const char *url;
37 request_rec *r;
38 h2_request *req;
39
40 h2_stream_state_t state;
41 unsigned int suspended : 1;
42 unsigned int data_received : 1;
43
44 apr_bucket_brigade *input;
45 apr_bucket_brigade *output;
46
47 apr_table_t *saves;
48 } h2_proxy_stream;
49
50
51 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
52 int arg, const char *msg);
53
54
55 static apr_status_t proxy_session_pre_close(void *theconn)
56 {
57 proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
58 h2_proxy_session *session = p_conn->data;
59
60 if (session && session->ngh2) {
61 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
62 "proxy_session(%s): pool cleanup, state=%d, streams=%d",
63 session->id, session->state,
64 (int)h2_ihash_count(session->streams));
65 session->aborted = 1;
66 dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
67 nghttp2_session_del(session->ngh2);
68 session->ngh2 = NULL;
69 p_conn->data = NULL;
70 }
71 return APR_SUCCESS;
72 }
73
74 static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
75 proxy_conn_rec *p_conn,
76 conn_rec *origin, apr_bucket_brigade *bb,
77 int flush)
78 {
79 apr_status_t status;
80 apr_off_t transferred;
81
82 if (flush) {
83 apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
84 APR_BRIGADE_INSERT_TAIL(bb, e);
85 }
86 apr_brigade_length(bb, 0, &transferred);
87 if (transferred != -1)
88 p_conn->worker->s->transferred += transferred;
89 status = ap_pass_brigade(origin->output_filters, bb);
90 /* Cleanup the brigade now to avoid buckets lifetime
91 * issues in case of error returned below. */
92 apr_brigade_cleanup(bb);
93 if (status != APR_SUCCESS) {
94 ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(03357)
95 "pass output failed to %pI (%s)",
96 p_conn->addr, p_conn->hostname);
97 }
98 return status;
99 }
100
101 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
102 size_t length, int flags, void *user_data)
103 {
104 h2_proxy_session *session = user_data;
105 apr_bucket *b;
106 apr_status_t status;
107 int flush = 1;
108
109 if (data) {
110 b = apr_bucket_transient_create((const char*)data, length,
111 session->c->bucket_alloc);
112 APR_BRIGADE_INSERT_TAIL(session->output, b);
113 }
114
115 status = proxy_pass_brigade(session->c->bucket_alloc,
116 session->p_conn, session->c,
117 session->output, flush);
118 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
119 "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
120 session->id, (int)length, flush);
121 if (status != APR_SUCCESS) {
122 return NGHTTP2_ERR_CALLBACK_FAILURE;
123 }
124 return length;
125 }
126
127 static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
128 void *user_data)
129 {
130 h2_proxy_session *session = user_data;
131 int n;
132
133 if (APLOGcdebug(session->c)) {
134 char buffer[256];
135
136 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
137 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
138 "h2_proxy_session(%s): recv FRAME[%s]",
139 session->id, buffer);
140 }
141
142 switch (frame->hd.type) {
143 case NGHTTP2_HEADERS:
144 break;
145 case NGHTTP2_PUSH_PROMISE:
146 break;
147 case NGHTTP2_SETTINGS:
148 if (frame->settings.niv > 0) {
149 n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
150 if (n > 0) {
151 session->remote_max_concurrent = n;
152 }
153 }
154 break;
155 case NGHTTP2_GOAWAY:
156 /* we expect the remote server to tell us the highest stream id
157 * that it has started processing. */
158 session->last_stream_id = frame->goaway.last_stream_id;
159 dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
160 if (APLOGcinfo(session->c)) {
161 char buffer[256];
162
163 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
164 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
165 "h2_proxy_session(%s): recv FRAME[%s]",
166 session->id, buffer);
167 }
168 break;
169 default:
170 break;
171 }
172 return 0;
173 }
174
175 static int before_frame_send(nghttp2_session *ngh2,
176 const nghttp2_frame *frame, void *user_data)
177 {
178 h2_proxy_session *session = user_data;
179 if (APLOGcdebug(session->c)) {
180 char buffer[256];
181
182 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
183 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
184 "h2_proxy_session(%s): sent FRAME[%s]",
185 session->id, buffer);
186 }
187 return 0;
188 }
189
190 static int add_header(void *table, const char *n, const char *v)
191 {
192 apr_table_addn(table, n, v);
193 return 1;
194 }
195
196 static void process_proxy_header(request_rec *r, const char *n, const char *v)
197 {
198 static const struct {
199 const char *name;
200 ap_proxy_header_reverse_map_fn func;
201 } transform_hdrs[] = {
202 { "Location", ap_proxy_location_reverse_map },
203 { "Content-Location", ap_proxy_location_reverse_map },
204 { "URI", ap_proxy_location_reverse_map },
205 { "Destination", ap_proxy_location_reverse_map },
206 { "Set-Cookie", ap_proxy_cookie_reverse_map },
207 { NULL, NULL }
208 };
209 proxy_dir_conf *dconf;
210 int i;
211
212 for (i = 0; transform_hdrs[i].name; ++i) {
213 if (!ap_casecmpstr(transform_hdrs[i].name, n)) {
214 dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
215 apr_table_add(r->headers_out, n,
216 (*transform_hdrs[i].func)(r, dconf, v));
217 return;
218 }
219 }
220 apr_table_add(r->headers_out, n, v);
221 }
222
223 static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
224 const char *n, apr_size_t nlen,
225 const char *v, apr_size_t vlen)
226 {
227 if (n[0] == ':') {
228 if (!stream->data_received && !strncmp(":status", n, nlen)) {
229 char *s = apr_pstrndup(stream->r->pool, v, vlen);
230
231 apr_table_setn(stream->r->notes, "proxy-status", s);
232 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
233 "h2_proxy_stream(%s-%d): got status %s",
234 stream->session->id, stream->id, s);
235 stream->r->status = (int)apr_atoi64(s);
236 if (stream->r->status <= 0) {
237 stream->r->status = 500;
238 return APR_EGENERAL;
239 }
240 }
241 return APR_SUCCESS;
242 }
243
244 if (!h2_proxy_res_ignore_header(n, nlen)) {
245 char *hname, *hvalue;
246
247 hname = apr_pstrndup(stream->pool, n, nlen);
248 h2_util_camel_case_header(hname, nlen);
249 hvalue = apr_pstrndup(stream->pool, v, vlen);
250
251 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
252 "h2_proxy_stream(%s-%d): got header %s: %s",
253 stream->session->id, stream->id, hname, hvalue);
254 process_proxy_header(stream->r, hname, hvalue);
255 }
256 return APR_SUCCESS;
257 }
258
259 static int log_header(void *ctx, const char *key, const char *value)
260 {
261 h2_proxy_stream *stream = ctx;
262 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
263 "h2_proxy_stream(%s-%d), header_out %s: %s",
264 stream->session->id, stream->id, key, value);
265 return 1;
266 }
267
268 static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
269 {
270 h2_proxy_session *session = stream->session;
271 request_rec *r = stream->r;
272 apr_pool_t *p = r->pool;
273
274 /* Now, add in the cookies from the response to the ones already saved */
275 apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
276
277 /* and now load 'em all in */
278 if (!apr_is_empty_table(stream->saves)) {
279 apr_table_unset(r->headers_out, "Set-Cookie");
280 r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
281 }
282
283 /* handle Via header in response */
284 if (session->conf->viaopt != via_off
285 && session->conf->viaopt != via_block) {
286 const char *server_name = ap_get_server_name(stream->r);
287 apr_port_t port = ap_get_server_port(stream->r);
288 char portstr[32];
289
290 /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
291 * then the server name returned by ap_get_server_name() is the
292 * origin server name (which does make too much sense with Via: headers)
293 * so we use the proxy vhost's name instead.
294 */
295 if (server_name == stream->r->hostname) {
296 server_name = stream->r->server->server_hostname;
297 }
298 if (ap_is_default_port(port, stream->r)) {
299 portstr[0] = '\0';
300 }
301 else {
302 apr_snprintf(portstr, sizeof(portstr), ":%d", port);
303 }
304
305 /* create a "Via:" response header entry and merge it */
306 apr_table_addn(r->headers_out, "Via",
307 (session->conf->viaopt == via_full)
308 ? apr_psprintf(p, "%d.%d %s%s (%s)",
309 HTTP_VERSION_MAJOR(r->proto_num),
310 HTTP_VERSION_MINOR(r->proto_num),
311 server_name, portstr,
312 AP_SERVER_BASEVERSION)
313 : apr_psprintf(p, "%d.%d %s%s",
314 HTTP_VERSION_MAJOR(r->proto_num),
315 HTTP_VERSION_MINOR(r->proto_num),
316 server_name, portstr)
317 );
318 }
319
320 if (APLOGrtrace2(stream->r)) {
321 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
322 "h2_proxy_stream(%s-%d), header_out after merging",
323 stream->session->id, stream->id);
324 apr_table_do(log_header, stream, stream->r->headers_out, NULL);
325 }
326 }
327
328 static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
329 int32_t stream_id, const uint8_t *data,
330 size_t len, void *user_data)
331 {
332 h2_proxy_session *session = user_data;
333 h2_proxy_stream *stream;
334 apr_bucket *b;
335 apr_status_t status;
336
337 stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
338 if (!stream) {
339 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03358)
340 "h2_proxy_session(%s): recv data chunk for "
341 "unknown stream %d, ignored",
342 session->id, stream_id);
343 return 0;
344 }
345
346 if (!stream->data_received) {
347 /* last chance to manipulate response headers.
348 * after this, only trailers */
349 h2_proxy_stream_end_headers_out(stream);
350 stream->data_received = 1;
351 }
352
353 b = apr_bucket_transient_create((const char*)data, len,
354 stream->r->connection->bucket_alloc);
355 APR_BRIGADE_INSERT_TAIL(stream->output, b);
356 /* always flush after a DATA frame, as we have no other indication
357 * of buffer use */
358 b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
359 APR_BRIGADE_INSERT_TAIL(stream->output, b);
360
361 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
362 "h2_proxy_session(%s): pass response data for "
363 "stream %d, %d bytes", session->id, stream_id, (int)len);
364 status = ap_pass_brigade(stream->r->output_filters, stream->output);
365 if (status != APR_SUCCESS) {
366 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
367 "h2_proxy_session(%s): passing output on stream %d",
368 session->id, stream->id);
369 nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
370 stream_id, NGHTTP2_STREAM_CLOSED);
371 return NGHTTP2_ERR_STREAM_CLOSING;
372 }
373 return 0;
374 }
375
376 static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
377 uint32_t error_code, void *user_data)
378 {
379 h2_proxy_session *session = user_data;
380 if (!session->aborted) {
381 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
382 "h2_proxy_session(%s): stream=%d, closed, err=%d",
383 session->id, stream_id, error_code);
384 dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
385 }
386 return 0;
387 }
388
389 static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
390 const uint8_t *namearg, size_t nlen,
391 const uint8_t *valuearg, size_t vlen, uint8_t flags,
392 void *user_data)
393 {
394 h2_proxy_session *session = user_data;
395 h2_proxy_stream *stream;
396 const char *n = (const char*)namearg;
397 const char *v = (const char*)valuearg;
398
399 (void)session;
400 if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
401 stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
402 if (stream) {
403 if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
404 return NGHTTP2_ERR_CALLBACK_FAILURE;
405 }
406 }
407 }
408 else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
409 }
410
411 return 0;
412 }
413
414 static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
415 uint8_t *buf, size_t length,
416 uint32_t *data_flags,
417 nghttp2_data_source *source, void *user_data)
418 {
419 h2_proxy_stream *stream;
420 apr_status_t status = APR_SUCCESS;
421
422 *data_flags = 0;
423 stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
424 if (!stream) {
425 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
426 "h2_proxy_stream(%s): data_read, stream %d not found",
427 stream->session->id, stream_id);
428 return NGHTTP2_ERR_CALLBACK_FAILURE;
429 }
430
431 if (APR_BRIGADE_EMPTY(stream->input)) {
432 status = ap_get_brigade(stream->r->input_filters, stream->input,
433 AP_MODE_READBYTES, APR_NONBLOCK_READ,
434 H2MAX(APR_BUCKET_BUFF_SIZE, length));
435 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
436 "h2_proxy_stream(%s-%d): request body read",
437 stream->session->id, stream->id);
438 }
439
440 if (status == APR_SUCCESS) {
441 ssize_t readlen = 0;
442 while (status == APR_SUCCESS
443 && (readlen < length)
444 && !APR_BRIGADE_EMPTY(stream->input)) {
445 apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
446 if (APR_BUCKET_IS_METADATA(b)) {
447 if (APR_BUCKET_IS_EOS(b)) {
448 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
449 }
450 else {
451 /* we do nothing more regarding any meta here */
452 }
453 }
454 else {
455 const char *bdata = NULL;
456 apr_size_t blen = 0;
457 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
458
459 if (status == APR_SUCCESS && blen > 0) {
460 ssize_t copylen = H2MIN(length - readlen, blen);
461 memcpy(buf, bdata, copylen);
462 buf += copylen;
463 readlen += copylen;
464 if (copylen < blen) {
465 /* We have data left in the bucket. Split it. */
466 status = apr_bucket_split(b, copylen);
467 }
468 }
469 }
470 apr_bucket_delete(b);
471 }
472
473 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
474 "h2_proxy_stream(%d): request body read %ld bytes, flags=%d",
475 stream->id, (long)readlen, (int)*data_flags);
476 return readlen;
477 }
478 else if (APR_STATUS_IS_EAGAIN(status)) {
479 /* suspended stream, needs to be re-awakened */
480 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
481 "h2_proxy_stream(%s-%d): suspending",
482 stream->session->id, stream_id);
483 stream->suspended = 1;
484 h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
485 return NGHTTP2_ERR_DEFERRED;
486 }
487 else {
488 nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
489 stream_id, NGHTTP2_STREAM_CLOSED);
490 return NGHTTP2_ERR_STREAM_CLOSING;
491 }
492 }
493
494 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
495 proxy_server_conf *conf,
496 unsigned char window_bits_connection,
497 unsigned char window_bits_stream,
498 h2_proxy_request_done *done)
499 {
500 if (!p_conn->data) {
501 apr_pool_t *pool = p_conn->scpool;
502 h2_proxy_session *session;
503 nghttp2_session_callbacks *cbs;
504 nghttp2_option *option;
505
506 session = apr_pcalloc(pool, sizeof(*session));
507 apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
508 p_conn->data = session;
509
510 session->id = apr_pstrdup(p_conn->scpool, id);
511 session->c = p_conn->connection;
512 session->p_conn = p_conn;
513 session->conf = conf;
514 session->pool = p_conn->scpool;
515 session->state = H2_PROXYS_ST_INIT;
516 session->window_bits_stream = window_bits_stream;
517 session->window_bits_connection = window_bits_connection;
518 session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
519 session->suspended = h2_iq_create(pool, 5);
520 session->done = done;
521
522 session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
523 session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
524
525 nghttp2_session_callbacks_new(&cbs);
526 nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
527 nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv);
528 nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
529 nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
530 nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
531 nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
532
533 nghttp2_option_new(&option);
534 nghttp2_option_set_peer_max_concurrent_streams(option, 100);
535 nghttp2_option_set_no_auto_window_update(option, 1);
536
537 nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
538
539 nghttp2_option_del(option);
540 nghttp2_session_callbacks_del(cbs);
541
542 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
543 "setup session for %s", p_conn->hostname);
544 }
545 return p_conn->data;
546 }
547
548 static apr_status_t session_start(h2_proxy_session *session)
549 {
550 nghttp2_settings_entry settings[2];
551 int rv, add_conn_window;
552 apr_socket_t *s;
553
554 s = ap_get_conn_socket(session->c);
555 #if (!defined(WIN32) && !defined(NETWARE)) || defined(DOXYGEN)
556 if (s) {
557 ap_sock_disable_nagle(s);
558 }
559 #endif
560
561 settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
562 settings[0].value = 0;
563 settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
564 settings[1].value = (1 << session->window_bits_stream) - 1;
565
566 rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings,
567 H2_ALEN(settings));
568
569 /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
570 add_conn_window = ((1 << session->window_bits_connection) - 1 -
571 NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
572 if (!rv && add_conn_window != 0) {
573 rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
574 }
575 return rv? APR_EGENERAL : APR_SUCCESS;
576 }
577
578 static apr_status_t open_stream(h2_proxy_session *session, const char *url,
579 request_rec *r, h2_proxy_stream **pstream)
580 {
581 h2_proxy_stream *stream;
582 apr_uri_t puri;
583 const char *authority, *scheme, *path;
584 apr_status_t status;
585
586 stream = apr_pcalloc(r->pool, sizeof(*stream));
587
588 stream->pool = r->pool;
589 stream->url = url;
590 stream->r = r;
591 stream->session = session;
592 stream->state = H2_STREAM_ST_IDLE;
593
594 stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
595 stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
596
597 stream->req = h2_req_create(1, stream->pool, 0);
598
599 status = apr_uri_parse(stream->pool, url, &puri);
600 if (status != APR_SUCCESS)
601 return status;
602
603 scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
604 authority = puri.hostname;
605 if (!ap_strchr_c(authority, ':') && puri.port
606 && apr_uri_port_of_scheme(scheme) != puri.port) {
607 /* port info missing and port is not default for scheme: append */
608 authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
609 }
610 path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
611 h2_req_make(stream->req, stream->pool, r->method, scheme,
612 authority, path, r->headers_in);
613
614 /* Tuck away all already existing cookies */
615 stream->saves = apr_table_make(r->pool, 2);
616 apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
617
618 *pstream = stream;
619
620 return APR_SUCCESS;
621 }
622
623 static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
624 {
625 h2_ngheader *hd;
626 nghttp2_data_provider *pp = NULL;
627 nghttp2_data_provider provider;
628 int rv;
629 apr_status_t status;
630
631 hd = h2_util_ngheader_make_req(stream->pool, stream->req);
632
633 status = ap_get_brigade(stream->r->input_filters, stream->input,
634 AP_MODE_READBYTES, APR_NONBLOCK_READ,
635 APR_BUCKET_BUFF_SIZE);
636 if ((status == APR_SUCCESS && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)))
637 || APR_STATUS_IS_EAGAIN(status)) {
638 /* there might be data coming */
639 provider.source.fd = 0;
640 provider.source.ptr = NULL;
641 provider.read_callback = stream_data_read;
642 pp = &provider;
643 }
644
645 rv = nghttp2_submit_request(session->ngh2, NULL,
646 hd->nv, hd->nvlen, pp, stream);
647
648 if (APLOGcdebug(session->c)) {
649 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
650 "h2_proxy_session(%s): submit %s%s -> %d",
651 session->id, stream->req->authority, stream->req->path,
652 rv);
653 }
654
655 if (rv > 0) {
656 stream->id = rv;
657 stream->state = H2_STREAM_ST_OPEN;
658 h2_ihash_add(session->streams, stream);
659 dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
660
661 return APR_SUCCESS;
662 }
663 return APR_EGENERAL;
664 }
665
666 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
667 {
668 apr_status_t status = APR_SUCCESS;
669 apr_size_t readlen = 0;
670 ssize_t n;
671
672 while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
673 apr_bucket* b = APR_BRIGADE_FIRST(bb);
674
675 if (APR_BUCKET_IS_METADATA(b)) {
676 /* nop */
677 }
678 else {
679 const char *bdata = NULL;
680 apr_size_t blen = 0;
681
682 status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
683 if (status == APR_SUCCESS && blen > 0) {
684 n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
685 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
686 "h2_proxy_session(%s): feeding %ld bytes -> %ld",
687 session->id, (long)blen, (long)n);
688 if (n < 0) {
689 if (nghttp2_is_fatal((int)n)) {
690 status = APR_EGENERAL;
691 }
692 }
693 else {
694 readlen += n;
695 if (n < blen) {
696 apr_bucket_split(b, n);
697 }
698 }
699 }
700 }
701 apr_bucket_delete(b);
702 }
703
704 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
705 "h2_proxy_session(%s): fed %ld bytes of input to session",
706 session->id, (long)readlen);
707 if (readlen == 0 && status == APR_SUCCESS) {
708 return APR_EAGAIN;
709 }
710 return status;
711 }
712
713 static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
714 apr_interval_time_t timeout)
715 {
716 apr_status_t status = APR_SUCCESS;
717
718 if (APR_BRIGADE_EMPTY(session->input)) {
719 apr_socket_t *socket = NULL;
720 apr_time_t save_timeout = -1;
721
722 if (block) {
723 socket = ap_get_conn_socket(session->c);
724 if (socket) {
725 apr_socket_timeout_get(socket, &save_timeout);
726 apr_socket_timeout_set(socket, timeout);
727 }
728 else {
729 /* cannot block on timeout */
730 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
731 "h2_proxy_session(%s): unable to get conn socket",
732 session->id);
733 return APR_ENOTIMPL;
734 }
735 }
736
737 status = ap_get_brigade(session->c->input_filters, session->input,
738 AP_MODE_READBYTES,
739 block? APR_BLOCK_READ : APR_NONBLOCK_READ,
740 64 * 1024);
741 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
742 "h2_proxy_session(%s): read from conn", session->id);
743 if (socket && save_timeout != -1) {
744 apr_socket_timeout_set(socket, save_timeout);
745 }
746 }
747
748 if (status == APR_SUCCESS) {
749 status = feed_brigade(session, session->input);
750 }
751 else if (APR_STATUS_IS_TIMEUP(status)) {
752 /* nop */
753 }
754 else if (!APR_STATUS_IS_EAGAIN(status)) {
755 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
756 "h2_proxy_session(%s): read error", session->id);
757 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
758 }
759
760 return status;
761 }
762
763 apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
764 const char *url, request_rec *r)
765 {
766 h2_proxy_stream *stream;
767 apr_status_t status;
768
769 status = open_stream(session, url, r, &stream);
770 if (status == APR_SUCCESS) {
771 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
772 "process stream(%d): %s %s%s, original: %s",
773 stream->id, stream->req->method,
774 stream->req->authority, stream->req->path,
775 r->the_request);
776 status = submit_stream(session, stream);
777 }
778 return status;
779 }
780
781 static apr_status_t check_suspended(h2_proxy_session *session)
782 {
783 h2_proxy_stream *stream;
784 int i, stream_id;
785 apr_status_t status;
786
787 for (i = 0; i < session->suspended->nelts; ++i) {
788 stream_id = session->suspended->elts[i];
789 stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
790 if (stream) {
791 status = ap_get_brigade(stream->r->input_filters, stream->input,
792 AP_MODE_READBYTES, APR_NONBLOCK_READ,
793 APR_BUCKET_BUFF_SIZE);
794 if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
795 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
796 "h2_proxy_stream(%s-%d): resuming",
797 session->id, stream_id);
798 stream->suspended = 0;
799 h2_iq_remove(session->suspended, stream_id);
800 nghttp2_session_resume_data(session->ngh2, stream_id);
801 dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
802 check_suspended(session);
803 return APR_SUCCESS;
804 }
805 else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
806 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
807 APLOGNO(03382) "h2_proxy_stream(%s-%d): check input",
808 session->id, stream_id);
809 h2_iq_remove(session->suspended, stream_id);
810 dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
811 check_suspended(session);
812 return APR_SUCCESS;
813 }
814 }
815 else {
816 /* gone? */
817 h2_iq_remove(session->suspended, stream_id);
818 check_suspended(session);
819 return APR_SUCCESS;
820 }
821 }
822 return APR_EAGAIN;
823 }
824
825 static apr_status_t session_shutdown(h2_proxy_session *session, int reason,
826 const char *msg)
827 {
828 apr_status_t status = APR_SUCCESS;
829 const char *err = msg;
830
831 AP_DEBUG_ASSERT(session);
832 if (!err && reason) {
833 err = nghttp2_strerror(reason);
834 }
835 nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
836 reason, (uint8_t*)err, err? strlen(err):0);
837 status = nghttp2_session_send(session->ngh2);
838 dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
839 return status;
840 }
841
842
843 static const char *StateNames[] = {
844 "INIT", /* H2_PROXYS_ST_INIT */
845 "DONE", /* H2_PROXYS_ST_DONE */
846 "IDLE", /* H2_PROXYS_ST_IDLE */
847 "BUSY", /* H2_PROXYS_ST_BUSY */
848 "WAIT", /* H2_PROXYS_ST_WAIT */
849 "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
850 "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
851 };
852
853 static const char *state_name(h2_proxys_state state)
854 {
855 if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
856 return "unknown";
857 }
858 return StateNames[state];
859 }
860
861 static int is_accepting_streams(h2_proxy_session *session)
862 {
863 switch (session->state) {
864 case H2_PROXYS_ST_IDLE:
865 case H2_PROXYS_ST_BUSY:
866 case H2_PROXYS_ST_WAIT:
867 return 1;
868 default:
869 return 0;
870 }
871 }
872
873 static void transit(h2_proxy_session *session, const char *action,
874 h2_proxys_state nstate)
875 {
876 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
877 "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
878 state_name(session->state), action, state_name(nstate));
879 session->state = nstate;
880 }
881
882 static void ev_init(h2_proxy_session *session, int arg, const char *msg)
883 {
884 switch (session->state) {
885 case H2_PROXYS_ST_INIT:
886 if (h2_ihash_empty(session->streams)) {
887 transit(session, "init", H2_PROXYS_ST_IDLE);
888 }
889 else {
890 transit(session, "init", H2_PROXYS_ST_BUSY);
891 }
892 break;
893
894 default:
895 /* nop */
896 break;
897 }
898 }
899
900 static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
901 {
902 switch (session->state) {
903 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
904 /* already did that? */
905 break;
906 case H2_PROXYS_ST_IDLE:
907 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
908 /* all done */
909 transit(session, "local goaway", H2_PROXYS_ST_DONE);
910 break;
911 default:
912 transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
913 break;
914 }
915 }
916
917 static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
918 {
919 switch (session->state) {
920 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
921 /* already received that? */
922 break;
923 case H2_PROXYS_ST_IDLE:
924 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
925 /* all done */
926 transit(session, "remote goaway", H2_PROXYS_ST_DONE);
927 break;
928 default:
929 transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
930 break;
931 }
932 }
933
934 static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
935 {
936 switch (session->state) {
937 case H2_PROXYS_ST_INIT:
938 case H2_PROXYS_ST_DONE:
939 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
940 /* just leave */
941 transit(session, "conn error", H2_PROXYS_ST_DONE);
942 break;
943
944 default:
945 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
946 "h2_proxy_session(%s): conn error -> shutdown", session->id);
947 session_shutdown(session, arg, msg);
948 break;
949 }
950 }
951
952 static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
953 {
954 switch (session->state) {
955 case H2_PROXYS_ST_DONE:
956 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
957 /* just leave */
958 transit(session, "proto error", H2_PROXYS_ST_DONE);
959 break;
960
961 default:
962 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
963 "h2_proxy_session(%s): proto error -> shutdown", session->id);
964 session_shutdown(session, arg, msg);
965 break;
966 }
967 }
968
969 static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
970 {
971 switch (session->state) {
972 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
973 transit(session, "conn timeout", H2_PROXYS_ST_DONE);
974 break;
975 default:
976 session_shutdown(session, arg, msg);
977 transit(session, "conn timeout", H2_PROXYS_ST_DONE);
978 break;
979 }
980 }
981
982 static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
983 {
984 switch (session->state) {
985 case H2_PROXYS_ST_BUSY:
986 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
987 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
988 /* nothing for input and output to do. If we remain
989 * in this state, we go into a tight loop and suck up
990 * CPU cycles. Ideally, we'd like to do a blocking read, but that
991 * is not possible if we have scheduled tasks and wait
992 * for them to produce something. */
993 if (h2_ihash_empty(session->streams)) {
994 if (!is_accepting_streams(session)) {
995 /* We are no longer accepting new streams and have
996 * finished processing existing ones. Time to leave. */
997 session_shutdown(session, arg, msg);
998 transit(session, "no io", H2_PROXYS_ST_DONE);
999 }
1000 else {
1001 /* When we have no streams, no task events are possible,
1002 * switch to blocking reads */
1003 transit(session, "no io", H2_PROXYS_ST_IDLE);
1004 }
1005 }
1006 else {
1007 /* Unable to do blocking reads, as we wait on events from
1008 * task processing in other threads. Do a busy wait with
1009 * backoff timer. */
1010 transit(session, "no io", H2_PROXYS_ST_WAIT);
1011 }
1012 break;
1013 default:
1014 /* nop */
1015 break;
1016 }
1017 }
1018
1019 static void ev_stream_submitted(h2_proxy_session *session, int stream_id,
1020 const char *msg)
1021 {
1022 switch (session->state) {
1023 case H2_PROXYS_ST_IDLE:
1024 case H2_PROXYS_ST_WAIT:
1025 transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
1026 break;
1027 default:
1028 /* nop */
1029 break;
1030 }
1031 }
1032
1033 static void ev_stream_done(h2_proxy_session *session, int stream_id,
1034 const char *msg)
1035 {
1036 h2_proxy_stream *stream;
1037
1038 stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
1039 if (stream) {
1040 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
1041 "h2_proxy_sesssion(%s): stream(%d) closed",
1042 session->id, stream_id);
1043
1044 if (!stream->data_received) {
1045 apr_bucket *b;
1046 /* if the response had no body, this is the time to flush
1047 * an empty brigade which will also "write" the resonse
1048 * headers */
1049 h2_proxy_stream_end_headers_out(stream);
1050 stream->data_received = 1;
1051 b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
1052 APR_BRIGADE_INSERT_TAIL(stream->output, b);
1053 b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
1054 APR_BRIGADE_INSERT_TAIL(stream->output, b);
1055 ap_pass_brigade(stream->r->output_filters, stream->output);
1056 }
1057
1058 stream->state = H2_STREAM_ST_CLOSED;
1059 h2_ihash_remove(session->streams, stream_id);
1060 h2_iq_remove(session->suspended, stream_id);
1061 if (session->done) {
1062 session->done(session, stream->r, 1, 1);
1063 }
1064 }
1065
1066 switch (session->state) {
1067 default:
1068 /* nop */
1069 break;
1070 }
1071 }
1072
1073 static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
1074 {
1075 switch (session->state) {
1076 case H2_PROXYS_ST_WAIT:
1077 transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
1078 break;
1079 default:
1080 /* nop */
1081 break;
1082 }
1083 }
1084
1085 static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
1086 {
1087 switch (session->state) {
1088 case H2_PROXYS_ST_IDLE:
1089 case H2_PROXYS_ST_WAIT:
1090 transit(session, "data read", H2_PROXYS_ST_BUSY);
1091 break;
1092 default:
1093 /* nop */
1094 break;
1095 }
1096 }
1097
1098 static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
1099 {
1100 switch (session->state) {
1101 case H2_PROXYS_ST_DONE:
1102 /* nop */
1103 break;
1104 default:
1105 transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
1106 break;
1107 }
1108 }
1109
1110 static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
1111 {
1112 switch (session->state) {
1113 case H2_PROXYS_ST_DONE:
1114 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1115 /* nop */
1116 break;
1117 default:
1118 session_shutdown(session, arg, msg);
1119 break;
1120 }
1121 }
1122
1123 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
1124 int arg, const char *msg)
1125 {
1126 switch (ev) {
1127 case H2_PROXYS_EV_INIT:
1128 ev_init(session, arg, msg);
1129 break;
1130 case H2_PROXYS_EV_LOCAL_GOAWAY:
1131 ev_local_goaway(session, arg, msg);
1132 break;
1133 case H2_PROXYS_EV_REMOTE_GOAWAY:
1134 ev_remote_goaway(session, arg, msg);
1135 break;
1136 case H2_PROXYS_EV_CONN_ERROR:
1137 ev_conn_error(session, arg, msg);
1138 break;
1139 case H2_PROXYS_EV_PROTO_ERROR:
1140 ev_proto_error(session, arg, msg);
1141 break;
1142 case H2_PROXYS_EV_CONN_TIMEOUT:
1143 ev_conn_timeout(session, arg, msg);
1144 break;
1145 case H2_PROXYS_EV_NO_IO:
1146 ev_no_io(session, arg, msg);
1147 break;
1148 case H2_PROXYS_EV_STREAM_SUBMITTED:
1149 ev_stream_submitted(session, arg, msg);
1150 break;
1151 case H2_PROXYS_EV_STREAM_DONE:
1152 ev_stream_done(session, arg, msg);
1153 break;
1154 case H2_PROXYS_EV_STREAM_RESUMED:
1155 ev_stream_resumed(session, arg, msg);
1156 break;
1157 case H2_PROXYS_EV_DATA_READ:
1158 ev_data_read(session, arg, msg);
1159 break;
1160 case H2_PROXYS_EV_NGH2_DONE:
1161 ev_ngh2_done(session, arg, msg);
1162 break;
1163 case H2_PROXYS_EV_PRE_CLOSE:
1164 ev_pre_close(session, arg, msg);
1165 break;
1166 default:
1167 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
1168 "h2_proxy_session(%s): unknown event %d",
1169 session->id, ev);
1170 break;
1171 }
1172 }
1173
1174 apr_status_t h2_proxy_session_process(h2_proxy_session *session)
1175 {
1176 apr_status_t status;
1177 int have_written = 0, have_read = 0;
1178
1179 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1180 "h2_proxy_session(%s): process", session->id);
1181
1182 run_loop:
1183 switch (session->state) {
1184 case H2_PROXYS_ST_INIT:
1185 status = session_start(session);
1186 if (status == APR_SUCCESS) {
1187 dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
1188 goto run_loop;
1189 }
1190 else {
1191 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
1192 }
1193 break;
1194
1195 case H2_PROXYS_ST_BUSY:
1196 case H2_PROXYS_ST_LOCAL_SHUTDOWN:
1197 case H2_PROXYS_ST_REMOTE_SHUTDOWN:
1198 while (nghttp2_session_want_write(session->ngh2)) {
1199 int rv = nghttp2_session_send(session->ngh2);
1200 if (rv < 0 && nghttp2_is_fatal(rv)) {
1201 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1202 "h2_proxy_session(%s): write, rv=%d", session->id, rv);
1203 dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
1204 break;
1205 }
1206 have_written = 1;
1207 }
1208
1209 if (nghttp2_session_want_read(session->ngh2)) {
1210 status = h2_proxy_session_read(session, 0, 0);
1211 if (status == APR_SUCCESS) {
1212 have_read = 1;
1213 }
1214 }
1215
1216 if (!have_written && !have_read
1217 && !nghttp2_session_want_write(session->ngh2)) {
1218 dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
1219 goto run_loop;
1220 }
1221 break;
1222
1223 case H2_PROXYS_ST_WAIT:
1224 if (check_suspended(session) == APR_EAGAIN) {
1225 /* no stream has become resumed. Do a blocking read with
1226 * ever increasing timeouts... */
1227 if (session->wait_timeout < 25) {
1228 session->wait_timeout = 25;
1229 }
1230 else {
1231 session->wait_timeout = H2MIN(apr_time_from_msec(100),
1232 2*session->wait_timeout);
1233 }
1234
1235 status = h2_proxy_session_read(session, 1, session->wait_timeout);
1236 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
1237 APLOGNO(03365)
1238 "h2_proxy_session(%s): WAIT read, timeout=%fms",
1239 session->id, (float)session->wait_timeout/1000.0);
1240 if (status == APR_SUCCESS) {
1241 have_read = 1;
1242 dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
1243 }
1244 else if (APR_STATUS_IS_TIMEUP(status)
1245 || APR_STATUS_IS_EAGAIN(status)) {
1246 /* go back to checking all inputs again */
1247 transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
1248 }
1249 }
1250 break;
1251
1252 case H2_PROXYS_ST_IDLE:
1253 break;
1254
1255 case H2_PROXYS_ST_DONE: /* done, session terminated */
1256 return APR_EOF;
1257
1258 default:
1259 ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
1260 APLOGNO(03346)"h2_proxy_session(%s): unknown state %d",
1261 session->id, session->state);
1262 dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
1263 break;
1264 }
1265
1266
1267 if (have_read || have_written) {
1268 session->wait_timeout = 0;
1269 }
1270
1271 if (!nghttp2_session_want_read(session->ngh2)
1272 && !nghttp2_session_want_write(session->ngh2)) {
1273 dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
1274 }
1275
1276 return APR_SUCCESS; /* needs to be called again */
1277 }
1278
1279 typedef struct {
1280 h2_proxy_session *session;
1281 h2_proxy_request_done *done;
1282 } cleanup_iter_ctx;
1283
1284 static int done_iter(void *udata, void *val)
1285 {
1286 cleanup_iter_ctx *ctx = udata;
1287 h2_proxy_stream *stream = val;
1288 int touched = (!ctx->session->last_stream_id ||
1289 stream->id <= ctx->session->last_stream_id);
1290 ctx->done(ctx->session, stream->r, 0, touched);
1291 return 1;
1292 }
1293
1294 void h2_proxy_session_cleanup(h2_proxy_session *session,
1295 h2_proxy_request_done *done)
1296 {
1297 if (session->streams && !h2_ihash_empty(session->streams)) {
1298 cleanup_iter_ctx ctx;
1299 ctx.session = session;
1300 ctx.done = done;
1301 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
1302 "h2_proxy_session(%s): terminated, %d streams unfinished",
1303 session->id, (int)h2_ihash_count(session->streams));
1304 h2_ihash_iter(session->streams, done_iter, &ctx);
1305 h2_ihash_clear(session->streams);
1306 }
1307 }
1308
1309 typedef struct {
1310 h2_proxy_session *session;
1311 conn_rec *c;
1312 apr_off_t bytes;
1313 int updated;
1314 } win_update_ctx;
1315
1316 static int win_update_iter(void *udata, void *val)
1317 {
1318 win_update_ctx *ctx = udata;
1319 h2_proxy_stream *stream = val;
1320
1321 if (stream->r && stream->r->connection == ctx->c) {
1322 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c,
1323 "h2_proxy_session(%s-%d): win_update %ld bytes",
1324 ctx->session->id, (int)stream->id, (long)ctx->bytes);
1325 nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
1326 ctx->updated = 1;
1327 return 0;
1328 }
1329 return 1;
1330 }
1331
1332
1333 void h2_proxy_session_update_window(h2_proxy_session *session,
1334 conn_rec *c, apr_off_t bytes)
1335 {
1336 if (session->streams && !h2_ihash_empty(session->streams)) {
1337 win_update_ctx ctx;
1338 ctx.session = session;
1339 ctx.c = c;
1340 ctx.bytes = bytes;
1341 ctx.updated = 0;
1342 h2_ihash_iter(session->streams, win_update_iter, &ctx);
1343
1344 if (!ctx.updated) {
1345 /* could not find the stream any more, possibly closed, update
1346 * the connection window at least */
1347 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
1348 "h2_proxy_session(%s): win_update conn %ld bytes",
1349 session->id, (long)bytes);
1350 nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
1351 }
1352 }
1353 }
1354