]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: filters: Adapt filters API to allow again TCP filtering on HTX streams
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 12 Nov 2019 10:13:01 +0000 (11:13 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Fri, 15 Nov 2019 12:43:08 +0000 (13:43 +0100)
This change make the payload filtering uniform between TCP and HTTP
filters. Now, in TCP, like in HTTP, there is only one callback responsible to
forward data. Thus, old callbacks, tcp_data() and tcp_forward_data(), are
replaced by a single callback function, tcp_payload(). This new callback gets
the offset in the payload to (re)start the filtering and the maximum amount of
data it can forward. It is the filter's responsibility to be compatible with HTX
streams. If not, it must not set the flag FLT_CFG_FL_HTX.

Because of this change, nxt and fwd offsets are no longer needed. Thus they are
removed from the filter structure with their update functions,
flt_change_next_size() and flt_change_forward_size(). Moreover, the trace filter
has been updated accordingly.

This patch breaks the compatibility with the old API. Thus it should probably
not be backported. But, AFAIK, there is no TCP filter, thus the breakage is very
limited.

include/proto/filters.h
include/types/filters.h
src/filters.c
src/flt_trace.c

index 1ca47c7f431f1e5a4ac5d2be6769db14dd009fa0..2ece1894dd31b36486d3cb0588e54a48159eec23 100644 (file)
@@ -45,13 +45,6 @@ extern const char *fcgi_flt_id;
 #define FLT_STRM_OFF(s, chn) (strm_flt(s)->offset[CHN_IDX(chn)])
 #define FLT_OFF(flt, chn) ((flt)->offset[CHN_IDX(chn)])
 
-#define FLT_NXT(flt, chn) ((flt)->next[CHN_IDX(chn)])
-#define FLT_FWD(flt, chn) ((flt)->fwd[CHN_IDX(chn)])
-#define flt_req_nxt(flt) ((flt)->next[0])
-#define flt_rsp_nxt(flt) ((flt)->next[1])
-#define flt_req_fwd(flt) ((flt)->fwd[0])
-#define flt_rsp_fwd(flt) ((flt)->fwd[1])
-
 #define HAS_FILTERS(strm)           ((strm)->strm_flt.flags & STRM_FLT_FL_HAS_FILTERS)
 
 #define HAS_REQ_DATA_FILTERS(strm)  ((strm)->strm_flt.nb_req_data_filters != 0)
@@ -174,58 +167,6 @@ unregister_data_filter(struct stream *s, struct channel *chn, struct filter *fil
        }
 }
 
-/* This function must be called when a filter alter incoming data. It updates
- * next offset value of all filter's predecessors. Do not call this function
- * when a filter change the size of incomding data leads to an undefined
- * behavior.
- *
- * This is the filter's responsiblitiy to update data itself. For now, it is
- * unclear to know how to handle data updates, so we do the minimum here. For
- * example, if you filter an HTTP message, we must update msg->next and
- * msg->chunk_len values.
- */
-static inline void
-flt_change_next_size(struct filter *filter, struct channel *chn, int len)
-{
-       struct stream *s = chn_strm(chn);
-       struct filter *f;
-
-       list_for_each_entry(f, &strm_flt(s)->filters, list) {
-               if (f == filter)
-                       break;
-               if (IS_DATA_FILTER(filter, chn))
-                       FLT_NXT(f, chn) += len;
-       }
-}
-
-/* This function must be called when a filter alter forwarded data. It updates
- * offset values (next and forward) of all filters. Do not call this function
- * when a filter change the size of forwarded data leads to an undefined
- * behavior.
- *
- * This is the filter's responsiblitiy to update data itself. For now, it is
- * unclear to know how to handle data updates, so we do the minimum here. For
- * example, if you filter an HTTP message, we must update msg->next and
- * msg->chunk_len values.
- */
-static inline void
-flt_change_forward_size(struct filter *filter, struct channel *chn, int len)
-{
-       struct stream *s = chn_strm(chn);
-       struct filter *f;
-       int before = 1;
-
-       list_for_each_entry(f, &strm_flt(s)->filters, list) {
-               if (f == filter)
-                       before = 0;
-               if (IS_DATA_FILTER(filter, chn)) {
-                       if (before)
-                               FLT_FWD(f, chn) += len;
-                       FLT_NXT(f, chn) += len;
-               }
-       }
-}
-
 /* This function must be called when a filter alter payload data. It updates
  * offsets of all previous filters and the offset of the stream. Do not call
  * this function when a filter change the size of payload data leads to an
index c91eeae391ce1878112d89cf67d1dbc9850682d8..0b549510d2ae056ab086efb18d49cae759575569 100644 (file)
@@ -138,12 +138,11 @@ struct flt_kw_list {
  *                          to the client (mainly, when an error or a redirect
  *                          occur).
  *                          Returns nothing.
- *  - tcp_data            : Called when unparsed data are available.
- *                          Returns a negative value if an error occurs, else
- *                          the number of consumed bytes.
- *  - tcp_forward_data    : Called when some data can be consumed.
+ *
+ *
+ *  - tcp_payload         : Called when some data can be consumed.
  *                          Returns a negative value if an error occurs, else
- *                          or the number of forwarded bytes.
+ *                          the number of forwarded bytes.
  */
 struct flt_ops {
        /*
@@ -186,9 +185,8 @@ struct flt_ops {
        /*
         * TCP callbacks
         */
-       int  (*tcp_data)        (struct stream *s, struct filter *f, struct channel *chn);
-       int  (*tcp_forward_data)(struct stream *s, struct filter *f, struct channel *chn,
-                                unsigned int len);
+       int  (*tcp_payload)       (struct stream *s, struct filter *f, struct channel *chn,
+                                   unsigned int offset, unsigned int len);
 };
 
 /* Flags set on a filter config */
@@ -227,11 +225,8 @@ struct filter {
        struct flt_conf *config;           /* the filter's configuration */
        void           *ctx;               /* The filter context (opaque) */
        unsigned short  flags;             /* FLT_FL_* */
-       unsigned int    next[2];           /* Offset, relative to buf->p, to the next byte to parse for a specific channel
-                                           * 0: request channel, 1: response channel */
-       unsigned int    fwd[2];            /* Offset, relative to buf->p, to the next byte to forward for a specific channel
+       unsigned long long offset[2];      /* Offset of input data already filtered for a specific channel
                                            * 0: request channel, 1: response channel */
-       unsigned long long offset[2];
        unsigned int    pre_analyzers;     /* bit field indicating analyzers to pre-process */
        unsigned int    post_analyzers;    /* bit field indicating analyzers to post-process */
        struct list     list;              /* Next filter for the same proxy/stream */
index c7f3ebd791baf5b022c8a00d547a423054f3e642..090146e68d0f06f0e807dd66e32807258f7b7d2d 100644 (file)
@@ -670,9 +670,7 @@ flt_start_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
                                continue;
                }
 
-               FLT_NXT(filter, chn) = 0;
-               FLT_FWD(filter, chn) = 0;
-
+               FLT_OFF(filter, chn) = 0;
                if (FLT_OPS(filter)->channel_start_analyze) {
                        DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
                        ret = FLT_OPS(filter)->channel_start_analyze(s, filter, chn);
@@ -799,8 +797,7 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
                goto sync;
 
        RESUME_FILTER_LOOP(s, chn) {
-               FLT_NXT(filter, chn) = 0;
-               FLT_FWD(filter, chn) = 0;
+               FLT_OFF(filter, chn) = 0;
                unregister_data_filter(s, chn, filter);
 
                if (FLT_OPS(filter)->channel_end_analyze) {
@@ -850,118 +847,42 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
 
 
 /*
- * Calls 'tcp_data' callback for all "data" filters attached to a stream. This
- * function is called when incoming data are available. It takes care to update
- * the next offset of filters and adjusts available data to be sure that a
- * filter cannot parse more data than its predecessors. A filter can choose to
- * not consume all available data. Returns -1 if an error occurs, the number of
- * consumed bytes otherwise.
- */
-static int
-flt_data(struct stream *s, struct channel *chn)
-{
-       struct filter *filter;
-       unsigned int   buf_i;
-       int            delta = 0, ret = 0;
-
-       /* Save buffer state */
-       buf_i = ci_data(chn);
-
-       list_for_each_entry(filter, &strm_flt(s)->filters, list) {
-               unsigned int *nxt;
-
-               /* Call "data" filters only */
-               if (!IS_DATA_FILTER(filter, chn))
-                       continue;
-
-               nxt = &FLT_NXT(filter, chn);
-               if (FLT_OPS(filter)->tcp_data) {
-                       unsigned int i = ci_data(chn);
-
-                       DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
-                       ret = FLT_OPS(filter)->tcp_data(s, filter, chn);
-                       if (ret < 0)
-                               break;
-                       delta += (int)(ci_data(chn) - i);
-
-                       /* Increase next offset of the current filter */
-                       *nxt += ret;
-
-                       /* And set this value as the bound for the next
-                        * filter. It will not able to parse more data than the
-                        * current one. */
-                       b_set_data(&chn->buf, co_data(chn) + *nxt);
-               }
-               else {
-                       /* Consume all available data */
-                       *nxt = ci_data(chn);
-               }
-
-               /* Update <ret> value to be sure to have the last one when we
-                * exit from the loop. This value will be used to know how much
-                * data are "forwardable" */
-               ret = *nxt;
-       }
-
-       /* Restore the original buffer state */
-       b_set_data(&chn->buf, co_data(chn) + buf_i + delta);
-
-       return ret;
-}
-
-/*
- * Calls 'tcp_forward_data' callback for all "data" filters attached to a
- * stream. This function is called when some data can be forwarded. It takes
- * care to update the forward offset of filters and adjusts "forwardable" data
- * to be sure that a filter cannot forward more data than its predecessors. A
- * filter can choose to not forward all parsed data. Returns a negative value if
- * an error occurs, else the number of forwarded bytes.
+ * Calls 'tcp_payload' callback for all "data" filters attached to a
+ * stream. This function is called when some data can be forwarded in the
+ * AN_REQ_FLT_XFER_BODY and AN_RES_FLT_XFER_BODY analyzers. It takes care to
+ * update the filters and the stream offset to be sure that a filter cannot
+ * forward more data than its predecessors. A filter can choose to not forward
+ * all data. Returns a negative value if an error occurs, else the number of
+ * forwarded bytes.
  */
-static int
-flt_forward_data(struct stream *s, struct channel *chn, unsigned int len)
+int
+flt_tcp_payload(struct stream *s, struct channel *chn, unsigned int len)
 {
        struct filter *filter;
-       int            ret = len;
+       unsigned long long *strm_off = &FLT_STRM_OFF(s, chn);
+       unsigned int out = co_data(chn);
+       int ret = len - out;
 
+       DBG_TRACE_ENTER(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
        list_for_each_entry(filter, &strm_flt(s)->filters, list) {
-               unsigned int *fwd;
-
                /* Call "data" filters only */
                if (!IS_DATA_FILTER(filter, chn))
                        continue;
+               if (FLT_OPS(filter)->tcp_payload) {
+                       unsigned long long *flt_off = &FLT_OFF(filter, chn);
+                       unsigned int offset = *flt_off - *strm_off;
 
-               fwd = &FLT_FWD(filter, chn);
-               if (FLT_OPS(filter)->tcp_forward_data) {
-                       /* Remove bytes that the current filter considered as
-                        * forwarded */
                        DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
-                       ret = FLT_OPS(filter)->tcp_forward_data(s, filter, chn, ret - *fwd);
+                       ret = FLT_OPS(filter)->tcp_payload(s, filter, chn, out + offset, ret - offset);
                        if (ret < 0)
                                goto end;
+                       *flt_off += ret;
+                       ret += offset;
                }
-
-               /* Adjust bytes that the current filter considers as
-                * forwarded */
-               *fwd += ret;
-
-               /* And set this value as the bound for the next filter. It will
-                * not able to forward more data than the current one. */
-               ret = *fwd;
        }
-
-       if (!ret)
-               goto end;
-
-       /* Finally, adjust filters offsets by removing data that HAProxy will
-        * forward. */
-       list_for_each_entry(filter, &strm_flt(s)->filters, list) {
-               if (!IS_DATA_FILTER(filter, chn))
-                       continue;
-               FLT_NXT(filter, chn) -= ret;
-               FLT_FWD(filter, chn) -= ret;
-       }
-
+       *strm_off += ret;
  end:
+       DBG_TRACE_LEAVE(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
        return ret;
 }
 
@@ -976,12 +897,13 @@ flt_forward_data(struct stream *s, struct channel *chn, unsigned int len)
 int
 flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit)
 {
+       unsigned int len;
        int ret = 1;
 
        DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
 
        /* If there is no "data" filters, we do nothing */
-       if (!HAS_DATA_FILTERS(s, chn) || (s->flags & SF_HTX))
+       if (!HAS_DATA_FILTERS(s, chn))
                goto end;
 
        /* Be sure that the output is still opened. Else we stop the data
@@ -990,26 +912,30 @@ flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit)
            ((chn->flags & CF_SHUTW) && (chn->to_forward || co_data(chn))))
                goto end;
 
-       /* Let all "data" filters parsing incoming data */
-       ret = flt_data(s, chn);
-       if (ret < 0)
-               goto end;
+       if (s->flags & SF_HTX) {
+               struct htx *htx = htxbuf(&chn->buf);
+               len = htx->data;
+       }
+       else
+               len = c_data(chn);
 
-       /* And forward them */
-       ret = flt_forward_data(s, chn, ret);
+       ret = flt_tcp_payload(s, chn, len);
        if (ret < 0)
                goto end;
-
-       /* Consume data that all filters consider as forwarded. */
        c_adv(chn, ret);
 
        /* Stop waiting data if the input in closed and no data is pending or if
         * the output is closed. */
-       if ((chn->flags & CF_SHUTW) ||
-           ((chn->flags & CF_SHUTR) && !ci_data(chn))) {
+       if (chn->flags & CF_SHUTW) {
                ret = 1;
                goto end;
        }
+       if (chn->flags & CF_SHUTR) {
+               if (((s->flags & SF_HTX) && htx_is_empty(htxbuf(&chn->buf))) || c_empty(chn)) {
+                       ret = 1;
+                       goto end;
+               }
+       }
 
        /* Wait for data */
        DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
index 5ed515802c2cfca3bbb009c7a2d156315b5b1631..e349431a692a31e29a2f21dc0af4343e6af2cb00 100644 (file)
@@ -110,17 +110,17 @@ trace_hexdump(struct ist ist)
 }
 
 static void
-trace_raw_hexdump(struct buffer *buf, int len, int out)
+trace_raw_hexdump(struct buffer *buf, unsigned int offset, unsigned int len)
 {
        unsigned char p[len];
        int block1, block2;
 
        block1 = len;
-       if (block1 > b_contig_data(buf, out))
-               block1 = b_contig_data(buf, out);
+       if (block1 > b_contig_data(buf, offset))
+               block1 = b_contig_data(buf, offset);
        block2 = len - block1;
 
-       memcpy(p, b_head(buf), block1);
+       memcpy(p, b_peek(buf, offset), block1);
        memcpy(p+block1, b_orig(buf), block2);
        trace_hexdump(ist2(p, len));
 }
@@ -153,6 +153,31 @@ trace_htx_hexdump(struct htx *htx, unsigned int offset, unsigned int len)
        }
 }
 
+static unsigned int
+trace_get_htx_datalen(struct htx *htx, unsigned int offset, unsigned int len)
+{
+       struct htx_blk *blk;
+       uint32_t sz, data = 0;
+
+       for (blk = htx_get_first_blk(htx); blk; blk = htx_get_next_blk(htx, blk)) {
+               if (htx_get_blk_type(blk) != HTX_BLK_DATA)
+                       break;
+
+               sz = htx_get_blksz(blk);
+               if (offset >= sz) {
+                       offset -= sz;
+                       continue;
+               }
+               data  += sz - offset;
+               offset = 0;
+               if (data > len) {
+                       data = len;
+                       break;
+               }
+       }
+       return data;
+}
+
 /***************************************************************************
  * Hooks that manage the filter lifecycle (init/check/deinit)
  **************************************************************************/
@@ -441,28 +466,9 @@ trace_http_payload(struct stream *s, struct filter *filter, struct http_msg *msg
        int ret = len;
 
        if (ret && conf->rand_forwarding) {
-               struct htx *htx = htxbuf(&msg->chn->buf);
-               struct htx_blk *blk;
-               uint32_t sz, data = 0;
-               unsigned int off = offset;
+               unsigned int data = trace_get_htx_datalen(htxbuf(&msg->chn->buf), offset, len);
 
-               for (blk = htx_get_first_blk(htx); blk; blk = htx_get_next_blk(htx, blk)) {
-                       if (htx_get_blk_type(blk) != HTX_BLK_DATA)
-                               break;
-
-                       sz = htx_get_blksz(blk);
-                       if (off >= sz) {
-                               off -= sz;
-                               continue;
-                       }
-                       data  += sz - off;
-                       off = 0;
-                       if (data > len) {
-                               data = len;
-                               break;
-                       }
-               }
-               if (data)  {
+               if (data) {
                        ret = random() % (ret+1);
                        if (!ret || ret >= data)
                                ret = len;
@@ -476,7 +482,7 @@ trace_http_payload(struct stream *s, struct filter *filter, struct http_msg *msg
                   offset, len, ret);
 
         if (conf->hexdump)
-                trace_htx_hexdump(htxbuf(&msg->chn->buf), offset, len);
+                trace_htx_hexdump(htxbuf(&msg->chn->buf), offset, ret);
 
         if (ret != len)
                 task_wakeup(s->task, TASK_WOKEN_MSG);
@@ -520,51 +526,51 @@ trace_http_reply(struct stream *s, struct filter *filter, short status,
  * Hooks to filter TCP data
  *************************************************************************/
 static int
-trace_tcp_data(struct stream *s, struct filter *filter, struct channel *chn)
+trace_tcp_payload(struct stream *s, struct filter *filter, struct channel *chn,
+                 unsigned int offset, unsigned int len)
 {
        struct trace_config *conf = FLT_CONF(filter);
-       int                  avail = ci_data(chn) - FLT_NXT(filter, chn);
-       int                  ret  = avail;
+       int ret = len;
 
-       if (ret && conf->rand_parsing)
-               ret = random() % (ret+1);
+       if (s->flags & SF_HTX) {
+               if (ret && conf->rand_forwarding) {
+                       unsigned int data = trace_get_htx_datalen(htxbuf(&chn->buf), offset, len);
 
-       FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - next=%u - avail=%u - consume=%d",
-                  __FUNCTION__,
-                  channel_label(chn), proxy_mode(s), stream_pos(s),
-                  FLT_NXT(filter, chn), avail, ret);
+                       if (data) {
+                               ret = random() % (ret+1);
+                               if (!ret || ret >= data)
+                                       ret = len;
+                       }
+               }
 
-       if (ret != avail)
-               task_wakeup(s->task, TASK_WOKEN_MSG);
-       return ret;
-}
+               FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - "
+                              "offset=%u - len=%u - forward=%d",
+                              __FUNCTION__,
+                              channel_label(chn), proxy_mode(s), stream_pos(s),
+                              offset, len, ret);
 
-static int
-trace_tcp_forward_data(struct stream *s, struct filter *filter, struct channel *chn,
-                unsigned int len)
-{
-       struct trace_config *conf = FLT_CONF(filter);
-       int                  ret  = len;
+               if (conf->hexdump)
+                       trace_htx_hexdump(htxbuf(&chn->buf), offset, ret);
+       }
+       else {
 
-       if (ret && conf->rand_forwarding)
-               ret = random() % (ret+1);
+               if (ret && conf->rand_forwarding)
+                       ret = random() % (ret+1);
 
-       FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - len=%u - fwd=%u - forward=%d",
-                  __FUNCTION__,
-                  channel_label(chn), proxy_mode(s), stream_pos(s), len,
-                  FLT_FWD(filter, chn), ret);
+               FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - "
+                              "offset=%u - len=%u - forward=%d",
+                              __FUNCTION__,
+                              channel_label(chn), proxy_mode(s), stream_pos(s),
+                              offset, len, ret);
 
-       if (conf->hexdump) {
-               c_adv(chn, FLT_FWD(filter, chn));
-               trace_raw_hexdump(&chn->buf, ret, co_data(chn));
-               c_rew(chn, FLT_FWD(filter, chn));
+               if (conf->hexdump)
+                       trace_raw_hexdump(&chn->buf, offset, ret);
        }
 
-       if (ret != len)
-               task_wakeup(s->task, TASK_WOKEN_MSG);
+        if (ret != len)
+                task_wakeup(s->task, TASK_WOKEN_MSG);
        return ret;
 }
-
 /********************************************************************
  * Functions that manage the filter initialization
  ********************************************************************/
@@ -598,8 +604,7 @@ struct flt_ops trace_ops = {
        .http_reply          = trace_http_reply,
 
        /* Filter TCP data */
-       .tcp_data         = trace_tcp_data,
-       .tcp_forward_data = trace_tcp_forward_data,
+       .tcp_payload        = trace_tcp_payload,
 };
 
 /* Return -1 on error, else 0 */