]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: http_fetch: add "req.ungrpc" sample fetch for gRPC.
authorFrédéric Lécaille <flecaille@haproxy.com>
Mon, 25 Feb 2019 14:30:36 +0000 (15:30 +0100)
committerWilly Tarreau <w@1wt.eu>
Tue, 26 Feb 2019 15:27:05 +0000 (16:27 +0100)
This patch implements "req.ungrpc" sample fetch method to decode and
parse a gRPC request. It takes only one argument: a protocol buffers
field number to identify the protocol buffers message number to be looked up.
This argument is a sort of path in dotted notation to the terminal field number
to be retrieved.

  ex:
    req.ungrpc(1.2.3.4)

This sample fetch catch the data in raw mode, without interpreting them.
Some protocol buffers specific converters may be used to convert the data
to the correct type.

include/proto/protocol_buffers.h [new file with mode: 0644]
src/http_fetch.c

diff --git a/include/proto/protocol_buffers.h b/include/proto/protocol_buffers.h
new file mode 100644 (file)
index 0000000..d210a72
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * include/proto/protocol_buffers.h
+ * This file contains functions and macros declarations for protocol buffers decoding.
+ *
+ * Copyright 2012 Willy Tarreau <w@1wt.eu>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, version 2.1
+ * exclusively.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef _PROTO_PROTOCOL_BUFFERS_H
+#define _PROTO_PROTOCOL_BUFFERS_H
+
+#include <types/protocol_buffers.h>
+
+#define PBUF_TYPE_VARINT           0
+#define PBUF_TYPE_64BIT            1
+#define PBUF_TYPE_LENGTH_DELIMITED 2
+#define PBUF_TYPE_START_GROUP      3
+#define PBUF_TYPE_STOP_GROUP       4
+#define PBUF_TYPE_32BIT            5
+
+#define PBUF_VARINT_DONT_STOP_BIT       7
+#define PBUF_VARINT_DONT_STOP_BITMASK  (1 << PBUF_VARINT_DONT_STOP_BIT)
+#define PBUF_VARINT_DATA_BITMASK            ~PBUF_VARINT_DONT_STOP_BITMASK
+
+/*
+ * Decode a protocol buffers varint located in a buffer at <pos> address with
+ * <len> as length. The decoded value is stored at <val>.
+ * Returns 1 if succeeded, 0 if not.
+ */
+static inline int
+protobuf_varint(uint64_t *val, unsigned char *pos, size_t len)
+{
+       unsigned int shift;
+
+       *val = 0;
+       shift = 0;
+
+       while (len > 0) {
+               int stop = !(*pos & PBUF_VARINT_DONT_STOP_BITMASK);
+
+               *val |= ((uint64_t)(*pos & PBUF_VARINT_DATA_BITMASK)) << shift;
+
+               ++pos;
+               --len;
+
+               if (stop)
+                       break;
+               else if (!len)
+                       return 0;
+
+               shift += 7;
+               /* The maximum length in bytes of a 64-bit encoded value is 10. */
+               if (shift > 70)
+                       return 0;
+       }
+
+       return 1;
+}
+
+/*
+ * Decode a protocol buffers varint located in a buffer at <pos> offset address with
+ * <len> as length address. Update <pos> and <len> consequently. Decrease <*len>
+ * by the number of decoded bytes. The decoded value is stored at <val>.
+ * Returns 1 if succeeded, 0 if not.
+ */
+static inline int
+protobuf_decode_varint(uint64_t *val, unsigned char **pos, size_t *len)
+{
+       unsigned int shift;
+
+       *val = 0;
+       shift = 0;
+
+       while (*len > 0) {
+               int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK);
+
+               *val |= ((uint64_t)**pos & PBUF_VARINT_DATA_BITMASK) << shift;
+
+               ++*pos;
+               --*len;
+
+               if (stop)
+                       break;
+               else if (!*len)
+                       return 0;
+
+               shift += 7;
+               /* The maximum length in bytes of a 64-bit encoded value is 10. */
+               if (shift > 70)
+                       return 0;
+       }
+
+       return 1;
+}
+
+/*
+ * Skip a protocol buffer varint found at <pos> as position address with <len>
+ * as available length address. Update <*pos> to make it point to the next
+ * available byte. Decrease <*len> by the number of skipped bytes.
+ * Returns 1 if succeeded, 0 if not.
+ */
+static inline int
+protobuf_skip_varint(unsigned char **pos, size_t *len)
+{
+       unsigned int shift;
+
+       shift = 0;
+
+       while (*len > 0) {
+               int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK);
+
+               ++*pos;
+               --*len;
+
+               if (stop)
+                       break;
+               else if (!*len)
+                       return 0;
+
+               shift += 7;
+               /* The maximum length in bytes of a 64-bit encoded value is 10. */
+               if (shift > 70)
+                       return 0;
+       }
+
+       return 1;
+}
+
+/*
+ * If succeeded, return the length of a prococol buffers varint found at <pos> as
+ * position address, with <len> as address of the available bytes at <*pos>.
+ * Update <*pos> to make it point to the next available byte. Decrease <*len>
+ * by the number of bytes used to encode this varint.
+ * Return -1 if failed.
+ */
+static inline int
+protobuf_varint_getlen(unsigned char **pos, size_t *len)
+{
+       unsigned char *spos;
+       unsigned int shift;
+
+       shift = 0;
+       spos = *pos;
+
+       while (*len > 0) {
+               int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK);
+
+               ++*pos;
+               --*len;
+
+               if (stop)
+                       break;
+               else if (!*len)
+                       return -1;
+
+               shift += 7;
+               /* The maximum length in bytes of a 64-bit encoded value is 10. */
+               if (shift > 70)
+                       return -1;
+       }
+
+       return *pos - spos;
+}
+
+#endif /* _PROTO_PROTOCOL_BUFFERS_H */
+
+/*
+ * Local variables:
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ * End:
+ */
index 51f2ef13f22333d35045c49e5d39912311ce1bfa..8f88646ea886aee17b6b1d4f102139fe16fc4438 100644 (file)
@@ -39,6 +39,7 @@
 #include <proto/log.h>
 #include <proto/obj_type.h>
 #include <proto/proto_http.h>
+#include <proto/protocol_buffers.h>
 #include <proto/sample.h>
 #include <proto/stream.h>
 
@@ -1516,6 +1517,245 @@ static int smp_fetch_hdr_val(const struct arg *args, struct sample *smp, const c
        return ret;
 }
 
+static inline struct buffer *
+smp_fetch_body_buf(const struct arg *args, struct sample *smp)
+{
+       struct buffer *buf;
+
+       if (IS_HTX_SMP(smp) || (smp->px->mode == PR_MODE_TCP)) {
+               /* HTX version */
+               struct htx *htx = smp_prefetch_htx(smp, args);
+               int32_t pos;
+
+               if (!htx)
+                       return NULL;
+
+               buf = get_trash_chunk();
+               for (pos = htx_get_head(htx); pos != -1; pos = htx_get_next(htx, pos)) {
+                       struct htx_blk *blk = htx_get_blk(htx, pos);
+                       enum htx_blk_type type = htx_get_blk_type(blk);
+
+                       if (type == HTX_BLK_EOM || type == HTX_BLK_EOD)
+                               break;
+                       if (type == HTX_BLK_DATA) {
+                               if (!htx_data_to_h1(htx_get_blk_value(htx, blk), buf, 0))
+                                       return NULL;
+                       }
+               }
+       }
+       else {
+               /* LEGACY version */
+               struct http_msg *msg;
+               unsigned long len;
+               unsigned long block1;
+               char *body;
+
+               if (smp_prefetch_http(smp->px, smp->strm, smp->opt, args, smp, 1) <= 0)
+                       return NULL;
+
+               if ((smp->opt & SMP_OPT_DIR) == SMP_OPT_DIR_REQ)
+                       msg = &smp->strm->txn->req;
+               else
+                       msg = &smp->strm->txn->rsp;
+
+               len  = http_body_bytes(msg);
+               body = c_ptr(msg->chn, -http_data_rewind(msg));
+
+               block1 = len;
+               if (block1 > b_wrap(&msg->chn->buf) - body)
+                       block1 = b_wrap(&msg->chn->buf) - body;
+
+               buf = get_trash_chunk();
+               if (block1 == len) {
+                       /* buffer is not wrapped (or empty) */
+                       memcpy(buf->area, body, len);
+               }
+               else {
+                       /* buffer is wrapped, we need to defragment it */
+                       memcpy(buf->area, body, block1);
+                       memcpy(buf->area + block1, b_orig(&msg->chn->buf),
+                              len - block1);
+               }
+               buf->data = len;
+       }
+
+       return buf;
+}
+
+#define GRPC_MSG_COMPRESS_FLAG_SZ 1 /* 1 byte */
+#define GRPC_MSG_LENGTH_SZ        4 /* 4 bytes */
+#define GRPC_MSG_HEADER_SZ        (GRPC_MSG_COMPRESS_FLAG_SZ + GRPC_MSG_LENGTH_SZ)
+
+/*
+ * Fetch a gRPC field value. Takes a mandatory argument: the field identifier
+ * (dotted notation) internally represented as an array of unsigned integers
+ * and its size.
+ * Return 1 if the field was found, 0 if not.
+ */
+static int smp_fetch_req_ungrpc(const struct arg *args, struct sample *smp, const char *kw, void *private)
+{
+       struct buffer *body;
+       unsigned char *pos;
+       size_t grpc_left;
+       unsigned int *fid;
+       size_t fid_sz;
+
+       if (!smp->strm)
+               return 0;
+
+       fid = args[0].data.fid.ids;
+       fid_sz = args[0].data.fid.sz;
+
+       body = smp_fetch_body_buf(args, smp);
+       if (!body)
+               return 0;
+
+       pos = (unsigned char *)body->area;
+       /* Remaining bytes in the body to be parsed. */
+       grpc_left = body->data;
+
+       while (grpc_left > GRPC_MSG_COMPRESS_FLAG_SZ + GRPC_MSG_LENGTH_SZ) {
+               int next_field, found;
+               size_t grpc_msg_len, left;
+               unsigned int wire_type, field_number;
+               uint64_t key, elen;
+
+               grpc_msg_len = left = ntohl(*(uint32_t *)(pos + GRPC_MSG_COMPRESS_FLAG_SZ));
+
+               pos += GRPC_MSG_HEADER_SZ;
+               grpc_left -= GRPC_MSG_HEADER_SZ;
+
+               if (grpc_left < left)
+                       return 0;
+
+               found = 1;
+               /* Length of the length-delimited messages if any. */
+               elen = 0;
+
+               /* Message decoding: there may be serveral key+value protobuf pairs by
+                * gRPC message.
+                */
+               next_field = 0;
+               while (next_field < fid_sz) {
+                       uint64_t sleft;
+
+                       if ((ssize_t)left <= 0)
+                               return 0;
+
+                       /* Remaining bytes saving. */
+                       sleft = left;
+
+                       /* Key decoding */
+                       if (!protobuf_decode_varint(&key, &pos, &left))
+                               return 0;
+
+                       wire_type = key & 0x7;
+                       field_number = key >> 3;
+                       found = field_number == fid[next_field];
+
+                       if (found && field_number != fid[next_field])
+                               found = 0;
+
+                       switch (wire_type) {
+                       case PBUF_TYPE_VARINT:
+                       {
+                               if (!found) {
+                                       protobuf_skip_varint(&pos, &left);
+                               } else if (next_field == fid_sz - 1) {
+                                       int varint_len;
+                                       unsigned char *spos = pos;
+
+                                       varint_len = protobuf_varint_getlen(&pos, &left);
+                                       if (varint_len == -1)
+                                               return 0;
+
+                                       smp->data.type = SMP_T_BIN;
+                                       smp->data.u.str.area = (char *)spos;
+                                       smp->data.u.str.data = varint_len;
+                                       smp->flags = SMP_F_VOL_TEST;
+                                       return 1;
+                               }
+                               break;
+                       }
+
+                       case PBUF_TYPE_64BIT:
+                       {
+                               if (!found) {
+                                       pos += sizeof(uint64_t);
+                                       left -= sizeof(uint64_t);
+                               } else if (next_field == fid_sz - 1) {
+                                       smp->data.type = SMP_T_BIN;
+                                       smp->data.u.str.area = (char *)pos;
+                                       smp->data.u.str.data = sizeof(uint64_t);
+                                       smp->flags = SMP_F_VOL_TEST;
+                                       return 1;
+                               }
+                               break;
+                       }
+
+                       case PBUF_TYPE_LENGTH_DELIMITED:
+                       {
+                               /* Decode the length of this length-delimited field. */
+                               if (!protobuf_decode_varint(&elen, &pos, &left))
+                                       return 0;
+
+                               if (elen > left)
+                                       return 0;
+
+                               /* The size of the current field is computed from here do skip
+                                * the bytes to encode the previous lenght.*
+                                */
+                               sleft = left;
+                               if (!found) {
+                                       /* Skip the current length-delimited field. */
+                                       pos += elen;
+                                       left -= elen;
+                                       break;
+                               } else if (next_field == fid_sz - 1) {
+                                       smp->data.type = SMP_T_BIN;
+                                       smp->data.u.str.area = (char *)pos;
+                                       smp->data.u.str.data = elen;
+                                       smp->flags = SMP_F_VOL_TEST;
+                                       return 1;
+                               }
+                               break;
+                       }
+
+                       case PBUF_TYPE_32BIT:
+                       {
+                               if (!found) {
+                                       pos += sizeof(uint32_t);
+                                       left -= sizeof(uint32_t);
+                               } else if (next_field == fid_sz - 1) {
+                                       smp->data.type = SMP_T_BIN;
+                                       smp->data.u.str.area = (char *)pos;
+                                       smp->data.u.str.data = sizeof(uint32_t);
+                                       smp->flags = SMP_F_VOL_TEST;
+                                       return 1;
+                               }
+                               break;
+                       }
+
+                       default:
+                               return 0;
+                       }
+
+                       if ((ssize_t)(elen) > 0)
+                               elen -= sleft - left;
+
+                       if (found) {
+                               next_field++;
+                       }
+                       else if ((ssize_t)elen <= 0) {
+                               next_field = 0;
+                       }
+               }
+               grpc_left -= grpc_msg_len;
+       }
+
+       return 0;
+}
+
 /* Fetch an HTTP header's IP value. takes a mandatory argument of type string
  * and an optional one of type int to designate a specific occurrence.
  * It returns an IPv4 or IPv6 address.
@@ -2882,6 +3122,7 @@ static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, {
        { "req.hdr_ip",         smp_fetch_hdr_ip,             ARG2(0,STR,SINT), val_hdr, SMP_T_IPV4, SMP_USE_HRQHV },
        { "req.hdr_names",      smp_fetch_hdr_names,          ARG1(0,STR),      NULL,    SMP_T_STR,  SMP_USE_HRQHV },
        { "req.hdr_val",        smp_fetch_hdr_val,            ARG2(0,STR,SINT), val_hdr, SMP_T_SINT, SMP_USE_HRQHV },
+       { "req.ungrpc",         smp_fetch_req_ungrpc,         ARG1(1, PBUF_FNUM), NULL,  SMP_T_BIN,  SMP_USE_HRQHV },
 
        /* explicit req.{cook,hdr} are used to force the fetch direction to be response-only */
        { "res.cook",           smp_fetch_cookie,             ARG1(0,STR),      NULL,    SMP_T_STR,  SMP_USE_HRSHV },