uint rx_head, rx_tail; /* head and tail of rx buffer in the conn's shared rx buf */
uint rx_count; /* total number of allocated rxbufs */
/* 4 bytes hole here */
- struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
struct wait_event *subs; /* recv wait_event the stream connector associated is waiting on (via h2_subscribe) */
struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */
struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to,
return ret;
}
+/* Returns a pointer to the oldest rxbuf of the stream, which must exist.
+ * Note that this doesn't indicate that the buffer is allocated nor contains
+ * any data.
+ */
+static inline struct buffer *_h2s_rxbuf_head(const struct h2s *h2s)
+{
+ return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf;
+}
+
+/* Returns a pointer to the newest rxbuf of the stream, which must exist.
+ * Note that this doesn't indicate that the buffer is allocated nor contains
+ * any data.
+ */
+static inline struct buffer *_h2s_rxbuf_tail(const struct h2s *h2s)
+{
+ return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf;
+}
+
/* Returns a pointer to the oldest rxbuf of the stream, or NULL if there is
* none. Note that this doesn't indicate that the buffer is allocated nor
* contains any data.
*/
static inline struct buffer *h2s_rxbuf_head(const struct h2s *h2s)
{
- if (!h2s->rx_head)
- return NULL;
- return &h2s->h2c->shared_rx_bufs[h2s->rx_head].buf;
+ return h2s->rx_head ? _h2s_rxbuf_head(h2s) : NULL;
}
/* Returns a pointer to the newest rxbuf of the stream, or NULL if there is
*/
static inline struct buffer *h2s_rxbuf_tail(const struct h2s *h2s)
{
- if (!h2s->rx_tail)
- return NULL;
- return &h2s->h2c->shared_rx_bufs[h2s->rx_tail].buf;
+ return h2s->rx_tail ? _h2s_rxbuf_tail(h2s) : NULL;
}
/* Returns the number of allocated rxbuf slots for the stream */
if ((h2c->flags & H2_CF_DEM_SALLOC) &&
(h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s_sc(h2s) &&
- b_alloc(&h2s->rxbuf, DB_SE_RX)) {
- h2c->flags &= ~H2_CF_DEM_SALLOC;
- h2c_restart_reading(h2c, 1);
- return 1;
+ (h2s_rxbuf_tail(h2s) || h2s_get_rxbuf(h2s))) {
+ h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL);
+ if (b_alloc(_h2s_rxbuf_tail(h2s), DB_SE_RX)) {
+ h2c->flags &= ~H2_CF_DEM_SALLOC;
+ h2c_restart_reading(h2c, 1);
+ return 1;
+ }
}
return 0;
h2c->st0 = H2_CS_PREFACE;
h2c->conn = conn;
h2c->streams_limit = h2c_max_concurrent_streams(h2c);
+ bl_init(h2c->shared_rx_bufs, h2c->streams_limit + 1);
+
h2c->max_id = -1;
h2c->errcode = H2_ERR_NO_ERROR;
h2c->rcvd_c = 0;
if (!h2s->id)
h2s->h2c->nb_reserved--;
if (h2s->sd && h2s_sc(h2s)) {
- if (!se_fl_test(h2s->sd, SE_FL_EOS) && !b_data(&h2s->rxbuf))
+ if (!se_fl_test(h2s->sd, SE_FL_EOS) &&
+ (!h2s_rxbuf_head(h2s) || !b_data(h2s_rxbuf_head(h2s))))
h2s_notify_recv(h2s);
}
HA_ATOMIC_DEC(&h2s->h2c->px_counters->open_streams);
static void h2s_destroy(struct h2s *h2s)
{
struct connection *conn = h2s->h2c->conn;
+ int freed = 0;
TRACE_ENTER(H2_EV_H2S_END, conn, h2s);
h2s_close(h2s);
eb32_delete(&h2s->by_id);
- if (b_size(&h2s->rxbuf)) {
- b_free(&h2s->rxbuf);
- offer_buffers(NULL, 1);
+
+ while (h2s->rx_head) {
+ b_free(&h2s->h2c->shared_rx_bufs[h2s->rx_head].buf);
+ h2s->rx_head = bl_put(h2s->h2c->shared_rx_bufs, h2s->rx_head);
+ freed++;
+ }
+
+ if (freed) {
+ offer_buffers(NULL, freed);
+ if (h2s->h2c->flags & H2_CF_DEM_RXBUF) {
+ /* just released resources the demux is waiting for */
+ h2s->h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF);
+ h2c_restart_reading(h2s->h2c, 1);
+ }
}
if (h2s->subs)
h2s->st = H2_SS_IDLE;
h2s->status = 0;
h2s->body_len = 0;
- h2s->rxbuf = BUF_NULL;
h2s->rx_tail = 0;
h2s->rx_head = 0;
h2s->rx_count = 0;
if (h2s->st != H2_SS_IDLE) {
/* The stream exists/existed, this must be a trailers frame */
if (h2s->st != H2_SS_CLOSED) {
- error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &body_len, NULL);
+ if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) {
+ TRACE_USER("Not allowed to get an extra buffer for H2 request trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, );
+ h2c->flags |= H2_CF_DEM_RXBUF;
+ goto out;
+ }
+
+ error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &body_len, NULL);
/* unrecoverable error ? */
if (h2c->st0 >= H2_CS_ERROR) {
- TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf);
+ TRACE_USER("Unrecoverable error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s));
sess_log(h2c->conn->owner);
goto out;
}
*/
sess_log(h2c->conn->owner);
h2s_error(h2s, H2_ERR_INTERNAL_ERROR);
- TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, &h2s->rxbuf);
+ TRACE_USER("Stream error decoding H2 trailers", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, 0, h2s_rxbuf_tail(h2s));
h2c->st0 = H2_CS_FRAME_E;
goto out;
}
goto fail; // incomplete frame
}
- if (h2s->st != H2_SS_CLOSED) {
- error = h2c_dec_hdrs(h2c, &h2s->rxbuf, &h2s->flags, &h2s->body_len, h2s->upgrade_protocol);
- }
- else {
+ if (h2s->st == H2_SS_CLOSED) {
/* the connection was already killed by an RST, let's consume
* the data and send another RST.
*/
goto send_rst;
}
+ if (!h2s_rxbuf_tail(h2s) && !h2s_get_rxbuf(h2s)) {
+ TRACE_USER("Not allowed to get an extra buffer for H2 response HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR|H2_EV_STRM_NEW|H2_EV_STRM_END, h2c->conn, );
+ h2c->flags |= H2_CF_DEM_RXBUF;
+ goto fail;
+ }
+
+ error = h2c_dec_hdrs(h2c, h2s_rxbuf_tail(h2s), &h2s->flags, &h2s->body_len, h2s->upgrade_protocol);
+
/* unrecoverable error ? */
if (h2c->st0 >= H2_CS_ERROR) {
TRACE_USER("Unrecoverable error decoding H2 HEADERS", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s);
h2s->flags &= ~H2_SF_BLK_MBUSY;
}
- TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, &h2s->rxbuf);
+ TRACE_USER("rcvd H2 response ", H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, 0, h2s_rxbuf_tail(h2s));
TRACE_LEAVE(H2_EV_RX_FRAME|H2_EV_RX_HDR, h2c->conn, h2s);
return h2s;
fail:
tmp_h2s = h2c_st_by_id(h2c, h2c->dsi);
if (tmp_h2s != h2s && h2s && h2s_sc(h2s) &&
- (b_data(&h2s->rxbuf) ||
+ ((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) ||
h2c_read0_pending(h2c) ||
h2s->st == H2_SS_CLOSED ||
(h2s->flags & H2_SF_ES_RCVD) ||
h2c->flags &= ~H2_CF_DEM_DFULL;
if (h2s && h2s_sc(h2s) &&
- (b_data(&h2s->rxbuf) ||
+ ((h2s_rxbuf_head(h2s) && b_data(h2s_rxbuf_head(h2s))) ||
h2c_read0_pending(h2c) ||
h2s->st == H2_SS_CLOSED ||
(h2s->flags & H2_SF_ES_RCVD) ||
goto done;
}
-/* Transfer the payload of a DATA frame to the HTTP/1 side. The HTTP/2 frame
+/* Transfer the payload of an H2 DATA frame to the HTX side. The HTTP/2 frame
* parser state is automatically updated. Returns > 0 if it could completely
* send the current frame, 0 if it couldn't complete, in which case
* SE_FL_RCV_MORE must be checked to know if some data remain pending (an empty
int block;
unsigned int flen = 0;
struct htx *htx = NULL;
- struct buffer *scbuf;
+ struct buffer *scbuf = NULL;
unsigned int sent;
+ int full = 0;
TRACE_ENTER(H2_EV_RX_FRAME|H2_EV_RX_DATA, h2c->conn, h2s);
- h2c->flags &= ~H2_CF_DEM_SFULL;
+ h2c->flags &= ~(H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF);
- scbuf = h2_get_buf(h2c, &h2s->rxbuf);
+ /* Note: the size estimate is approximative due to HTX fragmentation,
+ * so here we are optimistic and try to get the work done without
+ * allocating an rxbuf if possible. If we fail we'll more aggressively
+ * retry.
+ */
+ if ((!h2s_rxbuf_tail(h2s) || (full || !h2s_may_append_to_rxbuf(h2s))) && !h2s_get_rxbuf(h2s)) {
+ h2c->flags |= H2_CF_DEM_RXBUF;
+ TRACE_STATE("waiting for an h2s rxbuf slot", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
+ goto fail;
+ }
+
+ next_buffer:
+ /* now we have a non-full rxbuf */
+ scbuf = h2_get_buf(h2c, h2s_rxbuf_tail(h2s));
if (!scbuf) {
h2c->flags |= H2_CF_DEM_SALLOC;
TRACE_STATE("waiting for an h2s rxbuf", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
htx = htx_from_buf(scbuf);
try_again:
+ full = 0;
flen = h2c->dfl - h2c->dpl;
if (!flen)
goto end_transfer;
block = htx_free_data_space(htx);
if (!block) {
+ full = 1;
+ if (h2s_get_rxbuf(h2s))
+ goto next_buffer;
+
h2c->flags |= H2_CF_DEM_SFULL;
TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
goto fail;
}
if (sent < flen) {
+ if (!sent)
+ full = 1;
+ if (h2s_get_rxbuf(h2s))
+ goto next_buffer;
+
h2c->flags |= H2_CF_DEM_SFULL;
TRACE_STATE("h2s rxbuf is full", H2_EV_RX_FRAME|H2_EV_RX_DATA|H2_EV_H2S_BLK, h2c->conn, h2s);
goto fail;
struct h2c *h2c = h2s->h2c;
struct htx *h2s_htx = NULL;
struct htx *buf_htx = NULL;
+ struct buffer *rxbuf = NULL;
size_t ret = 0;
uint prev_h2c_flags = h2c->flags;
TRACE_ENTER(H2_EV_STRM_RECV, h2c->conn, h2s);
/* transfer possibly pending data to the upper layer */
- h2s_htx = htx_from_buf(&h2s->rxbuf);
+
+ xfer_next_buf:
+ rxbuf = h2s_rxbuf_head(h2s);
+ if (!rxbuf)
+ goto end; // may be NULL if empty
+
+ h2s_htx = htx_from_buf(rxbuf);
if (htx_is_empty(h2s_htx) && !(h2s_htx->flags & HTX_FL_PARSING_ERROR)) {
/* Here htx_to_buf() will set buffer data to 0 because
* the HTX is empty.
*/
- htx_to_buf(h2s_htx, &h2s->rxbuf);
+ htx_to_buf(h2s_htx, rxbuf);
goto end;
}
- ret = h2s_htx->data;
+ ret += h2s_htx->data;
buf_htx = htx_from_buf(buf);
/* <buf> is empty and the message is small enough, swap the
* buffers. */
if (htx_is_empty(buf_htx) && htx_used_space(h2s_htx) <= count) {
htx_to_buf(buf_htx, buf);
- htx_to_buf(h2s_htx, &h2s->rxbuf);
- b_xfer(buf, &h2s->rxbuf, b_data(&h2s->rxbuf));
+ htx_to_buf(h2s_htx, rxbuf);
+ b_xfer(buf, rxbuf, b_data(rxbuf));
goto end;
}
buf_htx->extra = (h2s_htx->extra ? (h2s_htx->data + h2s_htx->extra) : 0);
htx_to_buf(buf_htx, buf);
- htx_to_buf(h2s_htx, &h2s->rxbuf);
+ htx_to_buf(h2s_htx, rxbuf);
ret -= h2s_htx->data;
end:
/* release the rxbuf if it's not used anymore */
- if (!b_data(&h2s->rxbuf) && b_size(&h2s->rxbuf)) {
- b_free(&h2s->rxbuf);
+ if (rxbuf && !b_data(rxbuf) && b_size(rxbuf)) {
+ BUG_ON_HOT(rxbuf != _h2s_rxbuf_head(h2s));
+ b_free(_h2s_rxbuf_head(h2s));
offer_buffers(NULL, 1);
}
+ /* release the unused rxbuf slot */
+ if (rxbuf && !b_size(rxbuf)) {
+ h2s_put_rxbuf(h2s);
+ h2c->flags &= ~(H2_CF_DEM_RXBUF | H2_CF_DEM_SFULL);
+ goto xfer_next_buf;
+ }
+
/* tell the stream layer whether there are data left or not */
- if (b_data(&h2s->rxbuf))
+ if (h2s_rxbuf_cnt(h2s)) {
se_fl_set(h2s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
+ BUG_ON_HOT(!buf->data);
+ }
else {
if (!(h2c->flags & H2_CF_IS_BACK) && (h2s->flags & (H2_SF_BODY_TUNNEL|H2_SF_ES_RCVD))) {
/* If request ES is reported to the upper layer, it means the
h2c->flags &= ~H2_CF_DEM_SFULL;
/* wake up processing if we've unblocked something */
- if ((prev_h2c_flags & ~h2c->flags) & H2_CF_DEM_SFULL)
+ if ((prev_h2c_flags & ~h2c->flags) & ((H2_CF_DEM_SFULL | H2_CF_DEM_RXBUF)))
h2c_restart_reading(h2c, 1);
+ BUG_ON_HOT(!buf->data && se_fl_test(h2s->sd, SE_FL_WANT_ROOM));
+
TRACE_LEAVE(H2_EV_STRM_RECV, h2c->conn, h2s);
return ret;
}
*/
static int h2_dump_h2s_info(struct buffer *msg, const struct h2s *h2s, const char *pfx)
{
+ const struct buffer *head, *tail;
int ret = 0;
if (!h2s)
return ret;
- chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf=%u@%p+%u/%u",
+ head = h2s_rxbuf_head(h2s);
+ tail = h2s_rxbuf_tail(h2s);
+
+ chunk_appendf(msg, " h2s.id=%d .st=%s .flg=0x%04x .rxwin=%u .rxbuf.c=%u .t=%u@%p+%u/%u .h=%u@%p+%u/%u",
h2s->id, h2s_st_to_str(h2s->st), h2s->flags,
(uint)(h2s->next_max_ofs - h2s->curr_rx_ofs),
- (unsigned int)b_data(&h2s->rxbuf), b_orig(&h2s->rxbuf),
- (unsigned int)b_head_ofs(&h2s->rxbuf), (unsigned int)b_size(&h2s->rxbuf));
+ h2s_rxbuf_cnt(h2s),
+ tail ? (uint)b_data(tail) : 0,
+ tail ? b_orig(tail) : NULL,
+ tail ? (uint)b_head_ofs(tail) : 0,
+ tail ? (uint)b_size(tail) : 0,
+ head ? (uint)b_data(head) : 0,
+ head ? b_orig(head) : NULL,
+ head ? (uint)b_head_ofs(head) : 0,
+ head ? (uint)b_size(head) : 0);
if (pfx)
chunk_appendf(msg, "\n%s", pfx);