]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Remove PDU queuing
authorAlberto Leiva Popper <ydahhrk@gmail.com>
Sat, 21 Oct 2023 03:08:46 +0000 (21:08 -0600)
committerAlberto Leiva Popper <ydahhrk@gmail.com>
Tue, 24 Oct 2023 01:33:02 +0000 (19:33 -0600)
Tweak ideated during the commit message of the previous commit.

- If the read() yields at least one Error Report, drop the connection.

This is because all the server-received error codes currently defined
are supposed to result in immediate connection termination.

If a future RFC defines a nonfatal error code, Error Reports should
probably be downgraded to the 'last PDU' rule below.

- Otherwise, if a read() yields multile PDUs, drop all except for the
  last one.

Since it's what the client is most likely expecting, I guess. Serial
Queries and Reset Queries are alternate means to achieve the same goal,
so it doesn't make sense to queue them.

src/rtr/pdu.h
src/rtr/pdu_handler.c
src/rtr/pdu_handler.h
src/rtr/pdu_sender.c
src/rtr/pdu_stream.c
src/rtr/pdu_stream.h
src/rtr/rtr.c
test/rtr/pdu_handler_test.c
test/rtr/pdu_stream_test.c

index b93947e5e9bdfa90d8afc30d20c21c4b9a5d5846..f8e42e7d688a6e8544b53f0ef8d874444c7ae9b5 100644 (file)
@@ -62,34 +62,6 @@ char const *pdutype2str(enum pdu_type);
 
 #define RTRPDU_MAX_LEN2                        RTRPDU_ERROR_REPORT_MAX_LEN
 
-struct pdu_header {
-       enum rtr_version version;
-       enum pdu_type type;
-       union {
-               uint16_t session_id;
-               uint16_t reserved;
-               uint16_t error_code;
-       } m; /* Note: "m" stands for "meh." I have no idea what to call this. */
-       uint32_t length;
-};
-
-struct serial_query_pdu {
-       struct  pdu_header header;
-       uint32_t        serial_number;
-};
-
-struct reset_query_pdu {
-       struct  pdu_header header;
-};
-
-struct error_report_pdu {
-       struct  pdu_header header;
-       uint32_t        errpdu_len;
-       unsigned char   errpdu[RTRPDU_MAX_LEN];
-       uint32_t        errmsg_len;
-       char            *errmsg;
-};
-
 static inline size_t
 rtrpdu_error_report_len(uint32_t errpdu_len, uint32_t errmsg_len)
 {
index e9277b18a66af9128430bd79fca7e1464abb00b2..b8014a4648dd7ced17a0654a2948a7d70fc827a4 100644 (file)
@@ -56,16 +56,14 @@ send_delta_rk(struct delta_router_key const *delta, void *arg)
 }
 
 int
-handle_serial_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
+handle_serial_query_pdu(struct rtr_request *request)
 {
-       struct serial_query_pdu *sq;
        struct send_delta_args args;
        serial_t final_serial;
        int error;
 
-       sq = &pdu->obj.sq;
        args.fd = request->fd;
-       args.rtr_version = sq->header.version;
+       args.rtr_version = request->pdu.rtr_version;
        args.cache_response_sent = false;
 
        /*
@@ -75,9 +73,9 @@ handle_serial_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
         * the mismatch MUST immediately terminate the session with an Error
         * Report PDU with code 0 ("Corrupt Data")"
         */
-       if (sq->header.m.session_id != get_current_session_id(args.rtr_version))
+       if (request->pdu.obj.sq.session_id != get_current_session_id(args.rtr_version))
                return err_pdu_send_corrupt_data(args.fd, args.rtr_version,
-                       &pdu->raw, "Session ID doesn't match.");
+                       &request->pdu.raw, "Session ID doesn't match.");
 
        /*
         * For the record, there are two reasons why we want to work on a
@@ -88,8 +86,8 @@ handle_serial_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
         *    PDUs, to minimize writer stagnation.
         */
 
-       error = vrps_foreach_delta_since(sq->serial_number, &final_serial,
-           send_delta_vrp, send_delta_rk, &args);
+       error = vrps_foreach_delta_since(request->pdu.obj.sq.serial_number,
+           &final_serial, send_delta_vrp, send_delta_rk, &args);
        switch (error) {
        case 0:
                /*
@@ -164,7 +162,7 @@ send_base_router_key(struct router_key const *key, void *arg)
 }
 
 int
-handle_reset_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
+handle_reset_query_pdu(struct rtr_request *request)
 {
        struct base_roa_args args;
        serial_t current_serial;
@@ -172,7 +170,7 @@ handle_reset_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
 
        args.started = false;
        args.fd = request->fd;
-       args.version = pdu->obj.hdr.version;
+       args.version = request->pdu.rtr_version;
 
        error = get_last_serial_number(&current_serial);
        switch (error) {
@@ -216,23 +214,3 @@ handle_reset_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
 
        return send_end_of_data_pdu(args.fd, args.version, current_serial);
 }
-
-int
-handle_error_report_pdu(struct rtr_request *request, struct rtr_pdu *pdu)
-{
-       struct error_report_pdu *er;
-       char const *error_name;
-
-       er = &pdu->obj.er;
-       error_name = err_pdu_to_string(er->header.m.error_code);
-
-       if (er->errmsg != NULL) {
-               pr_op_info("RTR client %s responded with error PDU '%s' ('%s'). Closing socket.",
-                   request->client_addr, error_name, er->errmsg);
-       } else {
-               pr_op_info("RTR client %s responded with error PDU '%s'. Closing socket.",
-                   request->client_addr, error_name);
-       }
-
-       return -EINVAL;
-}
index 83b39b012b1061ee4522a50f51b3d8f8cffaac58..406827a50ad707e4af46e0c73124b1ea11719de8 100644 (file)
@@ -3,8 +3,7 @@
 
 #include "rtr/pdu_stream.h"
 
-int handle_serial_query_pdu(struct rtr_request *, struct rtr_pdu *);
-int handle_reset_query_pdu(struct rtr_request *, struct rtr_pdu *);
-int handle_error_report_pdu(struct rtr_request *, struct rtr_pdu *);
+int handle_serial_query_pdu(struct rtr_request *);
+int handle_reset_query_pdu(struct rtr_request *);
 
 #endif /* SRC_RTR_PDU_HANDLER_H_ */
index 9c9424dcbe44b891938609f4eb68a9754e40db15..41d347a912177c9235fcdca70148fe3173ea83b0 100644 (file)
@@ -173,7 +173,7 @@ send_router_key_pdu(int fd, uint8_t version,
 int
 send_end_of_data_pdu(int fd, uint8_t version, serial_t end_serial)
 {
-       static const uint8_t type = PDU_TYPE_ROUTER_KEY;
+       static const uint8_t type = PDU_TYPE_END_OF_DATA;
        unsigned char data[
            MAX(RTRPDU_END_OF_DATA_V1_LEN, RTRPDU_END_OF_DATA_V0_LEN)
        ];
@@ -228,7 +228,7 @@ send_error_report_pdu(int fd, uint8_t version, uint16_t code,
        len = rtrpdu_error_report_len(error_pdu_len, error_msg_len);
        data = pmalloc(len);
 
-       buf = serialize_hdr(data, version, type, 0, len);
+       buf = serialize_hdr(data, version, type, code, len);
        buf = write_uint32(buf, error_pdu_len);
        if (error_pdu_len > 0) {
                memcpy(buf, request->bytes, error_pdu_len);
index b2fb6090c06055ea16ac7c79526598703bb1b66f..066ad7ce0785fe7304eaadf903cfb685a8080d42 100644 (file)
@@ -31,6 +31,17 @@ struct pdu_stream {
        unsigned char *end;
 };
 
+struct pdu_header {
+       enum rtr_version version;
+       enum pdu_type type;
+       union {
+               uint16_t session_id;
+               uint16_t reserved;
+               uint16_t error_code;
+       } m; /* Note: "m" stands for "meh." I have no idea what to call this. */
+       uint32_t len;
+};
+
 struct pdu_stream *pdustream_create(int fd, char const *addr)
 {
        struct pdu_stream *result;
@@ -88,12 +99,14 @@ update_buffer(struct pdu_stream *in /* "in"put stream */)
                consumed = read(in->fd, in->end, RTRPDU_MAX_LEN2 - get_length(in));
                if (consumed == -1) {
                        error = errno;
-                       if (error == EAGAIN || error == EWOULDBLOCK)
+                       if (error == EAGAIN || error == EWOULDBLOCK) {
+                               pr_op_debug("Reached stream limit for now.");
                                return BS_WOULD_BLOCK;
-
-                       pr_op_err("Client socket read interrupted: %s",
-                           strerror(error));
-                       return BS_ERROR;
+                       } else {
+                               pr_op_err("Client socket read interrupted: %s",
+                                   strerror(error));
+                               return BS_ERROR;
+                       }
                }
 
                if (consumed == 0) {
@@ -107,6 +120,7 @@ update_buffer(struct pdu_stream *in /* "in"put stream */)
         * big PDU that either lengths exactly RTRPDU_MAX_LEN2, or is too big
         * for us to to allow it.
         */
+       pr_op_debug("Stream limit not reached yet.");
        return BS_KEEP_READING;
 }
 
@@ -217,7 +231,7 @@ read_hdr(struct pdu_stream *stream, struct pdu_header *header)
        header->version = stream->start[0];
        header->type = stream->start[1];
        header->m.reserved = read_uint16(stream->start + 2);
-       header->length = read_uint32(stream->start + 4);
+       header->len = read_uint32(stream->start + 4);
 }
 
 static int
@@ -273,124 +287,154 @@ unexpected:
 
 static int
 load_serial_query(struct pdu_stream *stream, struct pdu_header *hdr,
-    struct rtr_pdu *result)
+    struct rtr_request *result)
 {
-       if (hdr->length != RTRPDU_SERIAL_QUERY_LEN) {
+       size_t length;
+
+       if (hdr->len != RTRPDU_SERIAL_QUERY_LEN) {
+               pr_op_err("%s: Header length is not %u: %u",
+                   stream->addr, RTRPDU_SERIAL_QUERY_LEN, hdr->len);
                return err_pdu_send_invalid_request(
-                       stream->fd, stream->rtr_version, &result->raw,
+                       stream->fd, stream->rtr_version, &result->pdu.raw,
                        "Expected length 12 for Serial Query PDUs."
                );
        }
-       if (get_length(stream) < RTRPDU_SERIAL_QUERY_LEN)
+
+       length = get_length(stream);
+       if (length < RTRPDU_SERIAL_QUERY_LEN) {
+               pr_op_debug("PDU fragmented after hdr (%zu)", length);
                return EAGAIN;
+       }
 
        pr_op_debug("Received a Serial Query from %s.", stream->addr);
 
-       memcpy(&result->obj.sq.header, hdr, sizeof(*hdr));
+       result->pdu.obj.sq.session_id = hdr->m.session_id;
        stream->start += RTR_HDR_LEN;
-       result->obj.sq.serial_number = read_uint32(stream->start);
+       result->pdu.obj.sq.serial_number = read_uint32(stream->start);
        stream->start += 4;
-
        return 0;
 }
 
 static int
 load_reset_query(struct pdu_stream *stream, struct pdu_header *hdr,
-    struct rtr_pdu *result)
+    struct rtr_request *result)
 {
-       if (hdr->length != RTRPDU_RESET_QUERY_LEN) {
+       size_t length;
+
+       if (hdr->len != RTRPDU_RESET_QUERY_LEN) {
+               pr_op_err("%s: Header length is not %u: %u",
+                   stream->addr, RTRPDU_RESET_QUERY_LEN, hdr->len);
                return err_pdu_send_invalid_request(
-                       stream->fd, stream->rtr_version, &result->raw,
+                       stream->fd, stream->rtr_version, &result->pdu.raw,
                        "Expected length 8 for Reset Query PDUs."
                );
        }
-       if (get_length(stream) < RTRPDU_RESET_QUERY_LEN)
+
+       length = get_length(stream);
+       if (length < RTRPDU_RESET_QUERY_LEN) {
+               pr_op_debug("PDU fragmented after hdr (%zu)", length);
                return EAGAIN;
+       }
 
        pr_op_debug("Received a Reset Query from %s.", stream->addr);
 
-       memcpy(&result->obj.rq.header, hdr, sizeof(*hdr));
        stream->start += RTR_HDR_LEN;
-
        return 0;
 }
 
+static void
+handle_error_report_pdu(uint16_t errcode, char const *errmsg,
+    char const *client_addr)
+{
+       if (errmsg != NULL) {
+               pr_op_err("RTR client %s responded with error PDU '%s' ('%s'). Closing socket.",
+                   client_addr, err_pdu_to_string(errcode), errmsg);
+       } else {
+               pr_op_err("RTR client %s responded with error PDU '%s'. Closing socket.",
+                   client_addr, err_pdu_to_string(errcode));
+       }
+}
+
+
 static int
-load_error_report(struct pdu_stream *stream, struct pdu_header *hdr,
-    struct rtr_pdu *result)
+load_error_report(struct pdu_stream *stream, struct pdu_header *hdr)
 {
-       struct error_report_pdu *pdu;
+       uint32_t errpdu_len;
+       uint32_t errmsg_len;
+       char *errmsg;
        int error;
 
-       if (hdr->length > RTRPDU_ERROR_REPORT_MAX_LEN) {
+       if (hdr->len > RTRPDU_ERROR_REPORT_MAX_LEN) {
                return pr_op_err(
                        "RTR client %s sent a large Error Report PDU (%u bytes). This looks broken, so I'm dropping the connection.",
-                       stream->addr, hdr->length
+                       stream->addr, hdr->len
                );
        }
 
        pr_op_debug("Received an Error Report from %s.", stream->addr);
 
-       pdu = &result->obj.er;
-
        /* Header */
-       memcpy(&pdu->header, hdr, sizeof(*hdr));
        stream->start += RTR_HDR_LEN;
 
        /* Error PDU length */
        if (get_length(stream) < 4) {
+               pr_op_debug("Fragmented on error PDU length.");
                error = EAGAIN;
                goto revert_hdr;
        }
-       pdu->errpdu_len = read_uint32(stream->start);
+       errpdu_len = read_uint32(stream->start);
        stream->start += 4;
-       if (pdu->errpdu_len > RTRPDU_MAX_LEN) {
+       if (errpdu_len > RTRPDU_MAX_LEN) {
                /*
                 * We truncate PDUs larger than RTRPDU_MAX_LEN, so we couldn't
                 * have sent this PDU. Looks like someone is messing with us.
                 */
                error = pr_op_err(
                        "RTR client %s sent an Error Report PDU containing a large error PDU (%u bytes). This looks broken/insecure; I'm dropping the connection.",
-                       stream->addr, pdu->errpdu_len
+                       stream->addr, errpdu_len
                );
                goto revert_errpdu_len;
        }
 
        /* Error PDU */
-       if (get_length(stream) < pdu->errpdu_len) {
+       if (get_length(stream) < errpdu_len) {
+               pr_op_debug("Fragmented on error PDU.");
                error = EAGAIN;
                goto revert_errpdu_len;
        }
 
-       memcpy(pdu->errpdu, stream->start, pdu->errpdu_len);
-       stream->start += pdu->errpdu_len;
+       stream->start += errpdu_len; /* Skip it for now; we don't use it */
 
        /* Error msg length */
        if (get_length(stream) < 4) {
+               pr_op_debug("Fragmented on error message length.");
                error = EAGAIN;
                goto revert_errpdu;
        }
-       pdu->errmsg_len = read_uint32(stream->start);
+       errmsg_len = read_uint32(stream->start);
        stream->start += 4;
-       if (hdr->length != rtrpdu_error_report_len(pdu->errpdu_len, pdu->errmsg_len)) {
+       if (hdr->len != rtrpdu_error_report_len(errpdu_len, errmsg_len)) {
                error = pr_op_err(
                        "RTR client %s sent a malformed Error Report PDU; header length is %u, but effective length is %u + %u + %u + %u + %u.",
-                       stream->addr, hdr->length,
-                       RTR_HDR_LEN, 4, pdu->errpdu_len, 4, pdu->errmsg_len
+                       stream->addr, hdr->len,
+                       RTR_HDR_LEN, 4, errpdu_len, 4, errmsg_len
                );
                goto revert_errmsg_len;
        }
 
        /* Error msg */
-       pdu->errmsg = read_string(stream, pdu->errmsg_len);
-       stream->start += pdu->errmsg_len;
+       errmsg = read_string(stream, errmsg_len);
+       stream->start += errmsg_len;
 
-       return 0;
+       handle_error_report_pdu(hdr->m.error_code, errmsg, stream->addr);
+
+       free(errmsg);
+       return EINVAL;
 
 revert_errmsg_len:
        stream->start -= 4;
 revert_errpdu:
-       stream->start -= pdu->errpdu_len;
+       stream->start -= errpdu_len;
 revert_errpdu_len:
        stream->start -= 4;
 revert_hdr:
@@ -398,119 +442,162 @@ revert_hdr:
        return error;
 }
 
+static struct rtr_request *
+create_request(struct pdu_stream *stream, struct pdu_header *hdr,
+    struct rtr_buffer *raw)
+{
+       struct rtr_request *result;
+
+       result = pmalloc(sizeof(struct rtr_request));
+       result->fd = stream->fd;
+       strcpy(result->client_addr, stream->addr);
+       result->pdu.rtr_version = hdr->version;
+       result->pdu.type = hdr->type;
+       result->pdu.raw = *raw;
+       result->eos = false;
+
+       return result;
+}
+
 /*
+ * Returns the next "usable" PDU in the stream. Does not block.
+ *
+ * I call it "usable" because there can technically be multiple PDUs in the
+ * stream, and if that happens, it doesn't make sense to handle all of them
+ * sequentially. So there's no queuing.
+ *
+ * If there is at least one Error Report, it'll induce end of stream. This is
+ * because all the currently defined client-sourced Error Reports are fatal.
+ * The caller does not need to concern itself with handling Error Reports.
+ *
+ * Otherwise, if there are multiple PDUs in the stream, the last one will be
+ * returned, and the rest will be ignored.
+ * This is because Serial Queries and Reset Queries are the only non-error PDUs
+ * the server is supposed to receive, and they serve the same purpose. If the
+ * client sent two of them (maybe because we took too long to respond for some
+ * reason), it's most likely given up on the old one.
+ *
+ * Just to be clear, most of this is probably just paranoid error handling; a
+ * well-behaving client will never send us multiple PDUs in quick succession.
+ * But we don't know for sure how much buffering the underlying socket does,
+ * so we want to do something sensible if it happens.
+ *
  * Returns:
- * == 0: Success; at least zero PDUs read.
- * != 0: Communication broken; close the connection.
+ * true: Success. @result might or might not point to a PDU; check NULL.
+ * false: Communication ended or broken; close the connection, ignore @result.
  */
-int
-pdustream_next(struct pdu_stream *stream, struct rtr_request **_result)
+bool
+pdustream_next(struct pdu_stream *stream, struct rtr_request **result)
 {
        enum buffer_state state;
        struct pdu_header hdr;
-       struct rtr_request *result;
-       struct rtr_pdu *pdu;
+       struct rtr_buffer raw = { 0 };
+       struct rtr_request *request = NULL;
+       struct rtr_request *request_tmp;
        size_t remainder;
        int error;
 
-       result = pmalloc(sizeof(struct rtr_request));
-       result->fd = stream->fd;
-       strcpy(result->client_addr, stream->addr);
-       STAILQ_INIT(&result->pdus);
-       result->eos = false;
-
-       pdu = NULL;
+       *result = NULL;
 
 again:
        state = update_buffer(stream);
-       if (state == BS_ERROR) {
-               error = EINVAL;
-               goto fail;
-       }
+       if (state == BS_ERROR)
+               return false;
 
        while (stream->start < stream->end) {
+               request_tmp = NULL;
                remainder = get_length(stream);
 
                /* Read header. */
-               if (remainder < RTR_HDR_LEN)
-                       break;
+               if (remainder < RTR_HDR_LEN) {
+                       pr_op_debug("PDU fragmented on header (%zu)", remainder);
+                       break; /* PDU is fragmented */
+               }
                read_hdr(stream, &hdr);
 
                /* Init raw PDU; Needed early because of error responses. */
-               pdu = pzalloc(sizeof(struct rtr_pdu));
-               pdu->raw.bytes_len = (hdr.length <= remainder)
-                   ? hdr.length : remainder;
-               pdu->raw.bytes = pmalloc(pdu->raw.bytes_len);
-               memcpy(pdu->raw.bytes, stream->start, pdu->raw.bytes_len);
+               raw.bytes_len = (hdr.len <= remainder) ? hdr.len : remainder;
+               raw.bytes = pmalloc(raw.bytes_len);
+               memcpy(raw.bytes, stream->start, raw.bytes_len);
 
                /* Validate length; Needs raw. */
-               if (hdr.length > RTRPDU_MAX_LEN2) {
-                       error = err_pdu_send_invalid_request(
+               if (hdr.len > RTRPDU_MAX_LEN2) {
+                       pr_op_err("%s: Header length too big: %u > %u",
+                            stream->addr, hdr.len, RTRPDU_MAX_LEN2);
+                       err_pdu_send_invalid_request(
                                stream->fd,
                                (stream->rtr_version != -1)
                                    ? stream->rtr_version
                                    : hdr.version,
-                               &pdu->raw,
+                               &raw,
                                "PDU is too large."
                        );
                        goto fail;
                }
 
-               if (remainder < hdr.length) {
-                       free(pdu->raw.bytes);
-                       free(pdu);
-                       break;
-               }
-
                /* Validate version; Needs raw. */
-               error = validate_rtr_version(stream, &hdr, &pdu->raw);
-               if (error)
+               if (validate_rtr_version(stream, &hdr, &raw) != 0) {
+                       pr_op_err("%s: Bad RTR version: %u",
+                           stream->addr, hdr.version);
                        goto fail;
+               }
+
+               request_tmp = create_request(stream, &hdr, &raw);
+               raw.bytes = NULL; /* Ownership transferred */
 
                switch (hdr.type) {
                case PDU_TYPE_SERIAL_QUERY:
-                       error = load_serial_query(stream, &hdr, pdu);
+                       error = load_serial_query(stream, &hdr, request_tmp);
                        break;
                case PDU_TYPE_RESET_QUERY:
-                       error = load_reset_query(stream, &hdr, pdu);
+                       error = load_reset_query(stream, &hdr, request_tmp);
                        break;
                case PDU_TYPE_ERROR_REPORT:
-                       error = load_error_report(stream, &hdr, pdu);
+                       error = load_error_report(stream, &hdr);
                        break;
                default:
+                       pr_op_err("%s: Unknown PDU type: %u",
+                           stream->addr, hdr.version);
                        err_pdu_send_unsupported_pdu_type(stream->fd,
-                           stream->rtr_version, &pdu->raw);
-                       error = ENOTSUP;
+                           stream->rtr_version, &request_tmp->pdu.raw);
+                       goto fail;
                }
 
-               if (error)
+               if (error == EAGAIN) {
+                       rtreq_destroy(request_tmp);
+                       break;
+               } else if (error) {
                        goto fail;
+               }
 
-               STAILQ_INSERT_TAIL(&result->pdus, pdu, hook);
+               if (request != NULL)
+                       rtreq_destroy(request);
+               request = request_tmp;
        }
 
-       *_result = result;
-
        switch (state) {
-       case BS_WOULD_BLOCK:
-               result->eos = false;
-               return 0;
        case BS_EOS:
-               result->eos = true;
-               return 0;
+               if (request == NULL)
+                       return false;
+               request->eos = true;
+               /* Fall through */
+       case BS_WOULD_BLOCK:
+               *result = request;
+               return true;
        case BS_KEEP_READING:
                goto again;
-       default:
-               error = EINVAL;
+       case BS_ERROR:
+               pr_crit("This should have been catched earlier.");
        }
 
 fail:
-       if (pdu != NULL) {
-               free(pdu->raw.bytes);
-               free(pdu);
-       }
-       rtreq_destroy(result);
-       return error;
+       if (request != NULL)
+               rtreq_destroy(request);
+       if (request_tmp != NULL)
+               rtreq_destroy(request_tmp);
+       if (raw.bytes != NULL)
+               free(raw.bytes);
+       return false;
 }
 
 int
@@ -534,16 +621,7 @@ pdustream_version(struct pdu_stream *stream)
 void
 rtreq_destroy(struct rtr_request *request)
 {
-       struct rtr_pdu *pdu;
-
-       while (!STAILQ_EMPTY(&request->pdus)) {
-               pdu = STAILQ_FIRST(&request->pdus);
-               STAILQ_REMOVE_HEAD(&request->pdus, hook);
-
-               if (pdu->obj.hdr.type == PDU_TYPE_ERROR_REPORT)
-                       free(pdu->obj.er.errmsg);
-               free(pdu->raw.bytes);
-               free(pdu);
-       }
+       free(request->pdu.raw.bytes);
+       free(request);
 }
 
index db66ea23573eb3e4f0d46fec4eb619ae3d966b45..e0f279e98596ecdba596b5309ae802b1c0fda98a 100644 (file)
@@ -1,41 +1,34 @@
 #ifndef SRC_RTR_PDU_STREAM_H_
 #define SRC_RTR_PDU_STREAM_H_
 
-#include <sys/queue.h>
-
 #include "rtr/pdu.h"
 #include "rtr/rtr.h"
 #include "data_structure/array_list.h"
 
 struct pdu_stream; /* It's an *input* stream. */
 
-struct rtr_pdu {
-       /* Deserialized version */
-       union {
-               struct pdu_header hdr;
-               struct serial_query_pdu sq;
-               struct reset_query_pdu rq;
-               struct error_report_pdu er;
-       } obj;
-
-       /*
-        * Serialized version.
-        * Can be truncated; use for responding errors only.
-        */
-       struct rtr_buffer raw;
-
-       STAILQ_ENTRY(rtr_pdu) hook;
-};
-
 struct rtr_request {
        int fd;
        char client_addr[INET6_ADDRSTRLEN];
 
-       /*
-        * It's not sensible for a request to contain multiple PDUs,
-        * but I don't know how much buffering the underlying socket has.
-        */
-       STAILQ_HEAD(, rtr_pdu) pdus;
+       struct {
+               enum rtr_version rtr_version;
+               enum pdu_type type;
+
+               /* Deserialized version */
+               union {
+                       struct {
+                               uint16_t session_id;
+                               uint32_t serial_number;
+                       } sq; /* Serial Query */
+               } obj;
+
+               /*
+                * Serialized version.
+                * Can be truncated; use for responding errors only.
+                */
+               struct rtr_buffer raw;
+       } pdu;
 
        bool eos; /* end of stream */
 };
@@ -43,7 +36,7 @@ struct rtr_request {
 struct pdu_stream *pdustream_create(int, char const *);
 void pdustream_destroy(struct pdu_stream **);
 
-int pdustream_next(struct pdu_stream *, struct rtr_request **);
+bool pdustream_next(struct pdu_stream *, struct rtr_request **);
 int pdustream_fd(struct pdu_stream *);
 char const *pdustream_addr(struct pdu_stream *);
 int pdustream_version(struct pdu_stream *);
index 908b9cf09f7ae236c8b431bbefd18cdc778cd650..05b1fc04a629c05c0e31b8aa1a5ddbaa7fe33a11 100644 (file)
@@ -318,27 +318,21 @@ static void
 handle_client_request(void *arg)
 {
        struct rtr_request *request = arg;
-       struct rtr_pdu *pdu;
 
-       STAILQ_FOREACH(pdu, &request->pdus, hook) {
-               switch (pdu->obj.hdr.type) {
-               case PDU_TYPE_SERIAL_QUERY:
-                       handle_serial_query_pdu(request, pdu);
-                       break;
-               case PDU_TYPE_RESET_QUERY:
-                       handle_reset_query_pdu(request, pdu);
-                       break;
-               case PDU_TYPE_ERROR_REPORT:
-                       handle_error_report_pdu(request, pdu);
-                       break;
-               default:
-                       /* Should have been catched during constructor */
-                       pr_crit("Unexpected PDU type: %u", pdu->obj.hdr.type);
-               }
+       switch (request->pdu.type) {
+       case PDU_TYPE_SERIAL_QUERY:
+               handle_serial_query_pdu(request);
+               break;
+       case PDU_TYPE_RESET_QUERY:
+               handle_reset_query_pdu(request);
+               break;
+       default:
+               /* Should have been catched during constructor */
+               pr_crit("Unexpected PDU type: %u", request->pdu.type);
        }
 
        if (request->eos)
-               /* Wake poller to close the socket. Read side already shut. */
+               /* Wake poller to close the socket */
                shutdown(request->fd, SHUT_WR);
 
        rtreq_destroy(request);
@@ -442,16 +436,12 @@ static bool
 __handle_client_request(struct pdu_stream *stream)
 {
        struct rtr_request *request;
-       bool eos;
 
-       if (pdustream_next(stream, &request) != 0)
+       if (!pdustream_next(stream, &request))
                return false;
 
-       if (STAILQ_EMPTY(&request->pdus)) {
-               eos = request->eos;
-               free(request);
-               return !eos;
-       }
+       if (request == NULL)
+               return true;
 
        thread_pool_push(request_handlers, "RTR request", handle_client_request,
            request);
@@ -563,7 +553,7 @@ fddb_poll(void)
                }
        }
 
-       /* The servers might change this number, so store a backup. */
+       /* accept_new_client() might change this number, so store a backup. */
        nclients = clients.len;
 
        /* New connections */
@@ -599,9 +589,6 @@ fddb_poll(void)
                /* PR_DEBUG_MSG("Client %u: fd:%d revents:%x", i, fd->fd,
                    fd->revents); */
 
-//             if (fd->fd == -1)
-//                     continue;
-
                if (fd->revents & (POLLHUP | POLLERR | POLLNVAL)) {
                        fd->fd = -1;
                } else if (fd->revents & POLLIN) {
index 48b9c921264fe33b21c3443d65c4f73c78d9bf0b..e092ee6093ab76e196fa251ab50fbfca42187f71 100644 (file)
@@ -105,43 +105,33 @@ init_db_full(void)
 }
 
 static void
-init_request(struct rtr_request *request, struct rtr_pdu *pdu)
-{
-       request->fd = 0;
-       strcpy(request->client_addr, "192.0.2.1");
-       STAILQ_INIT(&request->pdus);
-       if (pdu != NULL)
-               STAILQ_INSERT_TAIL(&request->pdus, pdu, hook);
-       request->eos = false;
-}
-
-static void
-init_reset_query(struct rtr_pdu *pdu)
+init_reset_query(struct rtr_request *request)
 {
        static unsigned char raw[] = { 1, 2, 0, 0, 0, 0, 0, 8 };
 
-       pdu->obj.rq.header.version = RTR_V1;
-       pdu->obj.rq.header.type = PDU_TYPE_RESET_QUERY;
-       pdu->obj.rq.header.m.reserved = 0;
-       pdu->obj.rq.header.length = 8;
-       pdu->raw.bytes = raw;
-       pdu->raw.bytes_len = sizeof(raw);
-       memset(&pdu->hook, 0, sizeof(pdu->hook));
+       request->fd = 0;
+       strcpy(request->client_addr, "192.0.2.1");
+       request->pdu.rtr_version = RTR_V1;
+       request->pdu.type = PDU_TYPE_RESET_QUERY;
+       request->pdu.raw.bytes = raw;
+       request->pdu.raw.bytes_len = sizeof(raw);
+       request->eos = true;
 }
 
 static void
-init_serial_query(struct rtr_pdu *pdu, uint32_t serial)
+init_serial_query(struct rtr_request *request, uint32_t serial)
 {
        static unsigned char raw[] = { 1, 1, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0 };
 
-       pdu->obj.sq.header.version = RTR_V1;
-       pdu->obj.sq.header.type = PDU_TYPE_SERIAL_QUERY;
-       pdu->obj.sq.header.m.session_id = get_current_session_id(RTR_V1);
-       pdu->obj.sq.header.length = 12;
-       pdu->obj.sq.serial_number = serial;
-       pdu->raw.bytes = raw;
-       pdu->raw.bytes_len = sizeof(raw);
-       memset(&pdu->hook, 0, sizeof(pdu->hook));
+       request->fd = 0;
+       strcpy(request->client_addr, "192.0.2.1");
+       request->pdu.rtr_version = RTR_V1;
+       request->pdu.type = PDU_TYPE_SERIAL_QUERY;
+       request->pdu.obj.sq.session_id = get_current_session_id(RTR_V1);
+       request->pdu.obj.sq.serial_number = serial;
+       request->pdu.raw.bytes = raw;
+       request->pdu.raw.bytes_len = sizeof(raw);
+       request->eos = true;
 }
 
 /* Mocks */
@@ -253,14 +243,12 @@ send_error_report_pdu(int fd, uint8_t version, uint16_t code,
 START_TEST(test_start_or_restart)
 {
        struct rtr_request request;
-       struct rtr_pdu client_pdu;
 
        pr_op_info("-- Start or Restart --");
 
        /* Init */
        init_db_full();
-       init_reset_query(&client_pdu);
-       init_request(&request, &client_pdu);
+       init_reset_query(&request);
 
        /* Define expected server response */
        expected_pdu_add(PDU_TYPE_CACHE_RESPONSE);
@@ -270,7 +258,7 @@ START_TEST(test_start_or_restart)
        expected_pdu_add(PDU_TYPE_END_OF_DATA);
 
        /* Run and validate */
-       ck_assert_int_eq(0, handle_reset_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(0, handle_reset_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* Clean up */
@@ -282,25 +270,23 @@ END_TEST
 START_TEST(test_typical_exchange)
 {
        struct rtr_request request;
-       struct rtr_pdu client_pdu;
 
        pr_op_info("-- Typical Exchange --");
 
        /* Init */
        init_db_full();
-       init_serial_query(&client_pdu, 0);
-       init_request(&request, &client_pdu);
+       init_serial_query(&request, 0);
 
        /* From serial 0: Define expected server response */
        /* Server doesn't have serial 0. */
        expected_pdu_add(PDU_TYPE_CACHE_RESET);
 
        /* From serial 0: Run and validate */
-       ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(0, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* From serial 1: Init client request */
-       init_serial_query(&client_pdu, 1);
+       init_serial_query(&request, 1);
 
        /* From serial 1: Define expected server response */
        expected_pdu_add(PDU_TYPE_CACHE_RESPONSE);
@@ -313,11 +299,11 @@ START_TEST(test_typical_exchange)
        expected_pdu_add(PDU_TYPE_END_OF_DATA);
 
        /* From serial 1: Run and validate */
-       ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(0, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* From serial 2: Init client request */
-       init_serial_query(&client_pdu, 2);
+       init_serial_query(&request, 2);
 
        /* From serial 2: Define expected server response */
        expected_pdu_add(PDU_TYPE_CACHE_RESPONSE);
@@ -327,18 +313,18 @@ START_TEST(test_typical_exchange)
        expected_pdu_add(PDU_TYPE_END_OF_DATA);
 
        /* From serial 2: Run and validate */
-       ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(0, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* From serial 3: Init client request */
-       init_serial_query(&client_pdu, 3);
+       init_serial_query(&request, 3);
 
        /* From serial 3: Define expected server response */
        expected_pdu_add(PDU_TYPE_CACHE_RESPONSE);
        expected_pdu_add(PDU_TYPE_END_OF_DATA);
 
        /* From serial 3: Run and validate */
-       ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(0, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* Clean up */
@@ -350,20 +336,18 @@ END_TEST
 START_TEST(test_no_incremental_update_available)
 {
        struct rtr_request request;
-       struct rtr_pdu client_pdu;
 
        pr_op_info("-- No Incremental Update Available --");
 
        /* Init */
        init_db_full();
-       init_serial_query(&client_pdu, 10000);
-       init_request(&request, &client_pdu);
+       init_serial_query(&request, 10000);
 
        /* Define expected server response */
        expected_pdu_add(PDU_TYPE_CACHE_RESET);
 
        /* Run and validate */
-       ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(0, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* The Reset Query is already tested in start_or_restart. */
@@ -377,38 +361,31 @@ END_TEST
 START_TEST(test_cache_has_no_data_available)
 {
        struct rtr_request request;
-       struct rtr_pdu serial_query;
-       struct rtr_pdu reset_query;
 
        pr_op_info("-- Cache Has No Data Available --");
 
        /* Init */
        ck_assert_int_eq(0, vrps_init());
-       init_request(&request, NULL);
 
        /* Serial Query: Init client request */
-       init_serial_query(&serial_query, 0);
-       STAILQ_INSERT_TAIL(&request.pdus, &serial_query, hook);
+       init_serial_query(&request, 0);
 
        /* Serial Query: Define expected server response */
        expected_pdu_add(PDU_TYPE_ERROR_REPORT);
 
        /* Serial Query: Run and validate */
-       ck_assert_int_eq(0, handle_serial_query_pdu(&request, &serial_query));
+       ck_assert_int_eq(0, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
-       STAILQ_REMOVE_HEAD(&request.pdus, hook);
 
        /* Reset Query: Init client request */
-       init_reset_query(&reset_query);
-       STAILQ_INSERT_TAIL(&request.pdus, &reset_query, hook);
+       init_reset_query(&request);
 
        /* Reset Query: Define expected server response */
        expected_pdu_add(PDU_TYPE_ERROR_REPORT);
 
        /* Reset Query: Run and validate */
-       ck_assert_int_eq(0, handle_reset_query_pdu(&request, &reset_query));
+       ck_assert_int_eq(0, handle_reset_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
-       STAILQ_REMOVE_HEAD(&request.pdus, hook);
 
        /* Clean up */
        vrps_destroy();
@@ -418,21 +395,19 @@ END_TEST
 START_TEST(test_bad_session_id)
 {
        struct rtr_request request;
-       struct rtr_pdu client_pdu;
 
        pr_op_info("-- Bad Session ID --");
 
        /* Init */
        init_db_full();
-       init_serial_query(&client_pdu, 0);
-       client_pdu.obj.sq.header.m.session_id++;
-       init_request(&request, &client_pdu);
+       init_serial_query(&request, 0);
+       request.pdu.obj.sq.session_id++;
 
        /* From serial 0: Define expected server response */
        expected_pdu_add(PDU_TYPE_ERROR_REPORT);
 
        /* From serial 0: Run and validate */
-       ck_assert_int_eq(-EINVAL, handle_serial_query_pdu(&request, &client_pdu));
+       ck_assert_int_eq(-EINVAL, handle_serial_query_pdu(&request));
        ck_assert_uint_eq(false, has_expected_pdus());
 
        /* Clean up */
index a3b52713df860830261def32e28153e462b5f30e..073479f1c025dc5955ca98b3890cf345df024932 100644 (file)
@@ -17,6 +17,13 @@ MOCK_ABORT_INT(err_pdu_send_unsupported_pdu_type, int fd, uint8_t version,
 MOCK_ABORT_INT(err_pdu_send_unexpected_proto_version, int fd, uint8_t version,
     struct rtr_buffer const *request, char const *msg)
 
+char const *
+err_pdu_to_string(uint16_t c)
+{
+       ck_assert_uint_eq(c, 0x1617);
+       return "achoo";
+}
+
 /* End of mocks */
 
 static void
@@ -54,18 +61,6 @@ create_stream_fd(unsigned char *data, size_t datalen, int rtr_version)
        return result;
 }
 
-static void
-assert_pdu_count(unsigned int expected, struct rtr_request *request)
-{
-       struct rtr_pdu *pdu;
-       unsigned int npdu;
-
-       npdu = 0;
-       STAILQ_FOREACH(pdu, &request->pdus, hook)
-               npdu++;
-       ck_assert_uint_eq(expected, npdu);
-}
-
 START_TEST(test_pdu_header_from_stream)
 {
        unsigned char input[] = { 0, 1, 2, 3, 4, 5, 6, 7 };
@@ -78,7 +73,7 @@ START_TEST(test_pdu_header_from_stream)
        ck_assert_uint_eq(hdr.version, 0);
        ck_assert_uint_eq(hdr.type, 1);
        ck_assert_uint_eq(hdr.m.reserved, 0x0203);
-       ck_assert_uint_eq(hdr.length, 0x04050607);
+       ck_assert_uint_eq(hdr.len, 0x04050607);
 
        free(stream);
 }
@@ -94,25 +89,19 @@ START_TEST(test_serial_query_from_stream)
        };
        struct pdu_stream *stream;
        struct rtr_request *request;
-       struct rtr_pdu *pdu;
-       struct serial_query_pdu *sq;
 
        stream = create_stream_fd(input, sizeof(input), RTR_V1);
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
+       ck_assert_uint_eq(true, pdustream_next(stream, &request));
 
        ck_assert_int_eq(stream->fd, request->fd);
        ck_assert_str_eq(stream->addr, request->client_addr);
+       ck_assert_uint_eq(RTR_V1, request->pdu.rtr_version);
+       ck_assert_uint_eq(PDU_TYPE_SERIAL_QUERY, request->pdu.type);
+       ck_assert_uint_eq(0x0e0f1011, request->pdu.obj.sq.serial_number);
+       ck_assert_uint_eq(0x0708, request->pdu.obj.sq.session_id);
+       ck_assert_uint_eq(12, request->pdu.raw.bytes_len);
+       ck_assert(memcmp(input, request->pdu.raw.bytes, 12) == 0);
        ck_assert_uint_eq(1, request->eos);
-       assert_pdu_count(1, request);
-
-       pdu = STAILQ_FIRST(&request->pdus);
-       sq = &pdu->obj.sq;
-
-       ck_assert_uint_eq(sq->header.version, RTR_V1);
-       ck_assert_uint_eq(sq->header.type, PDU_TYPE_SERIAL_QUERY);
-       ck_assert_uint_eq(sq->header.m.reserved, 0x0708);
-       ck_assert_uint_eq(sq->header.length, 12);
-       ck_assert_uint_eq(sq->serial_number, 0x0e0f1011);
 
        rtreq_destroy(request);
        pdustream_destroy(&stream);
@@ -127,27 +116,16 @@ START_TEST(test_reset_query_from_stream)
        };
        struct pdu_stream *stream;
        struct rtr_request *request;
-       struct rtr_pdu *pdu;
-       struct reset_query_pdu *rq;
 
        stream = create_stream_fd(input, sizeof(input), RTR_V0);
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
-
+       ck_assert_uint_eq(true, pdustream_next(stream, &request));
        ck_assert_int_eq(stream->fd, request->fd);
        ck_assert_str_eq(stream->addr, request->client_addr);
+       ck_assert_uint_eq(RTR_V0, request->pdu.rtr_version);
+       ck_assert_uint_eq(PDU_TYPE_RESET_QUERY, request->pdu.type);
+       ck_assert_uint_eq(8, request->pdu.raw.bytes_len);
+       ck_assert(memcmp(input, request->pdu.raw.bytes, 8) == 0);
        ck_assert_uint_eq(1, request->eos);
-       assert_pdu_count(1, request);
-
-       pdu = STAILQ_FIRST(&request->pdus);
-       rq = &pdu->obj.rq;
-
-       ck_assert_uint_eq(rq->header.version, RTR_V0);
-       ck_assert_uint_eq(rq->header.type, PDU_TYPE_RESET_QUERY);
-       ck_assert_uint_eq(rq->header.m.reserved, 0x0c0d);
-       ck_assert_uint_eq(rq->header.length, 8);
-
-       ck_assert_uint_eq(8, pdu->raw.bytes_len);
-       ck_assert(memcmp(input, pdu->raw.bytes, 8) == 0);
 
        rtreq_destroy(request);
        pdustream_destroy(&stream);
@@ -172,60 +150,23 @@ START_TEST(test_error_report_from_stream)
        };
        struct pdu_stream *stream;
        struct rtr_request *request;
-       struct rtr_pdu *pdu;
-       struct error_report_pdu *er;
 
        stream = create_stream_fd(input, sizeof(input), RTR_V1);
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
-
-       ck_assert_int_eq(stream->fd, request->fd);
-       ck_assert_str_eq(stream->addr, request->client_addr);
-       ck_assert_uint_eq(1, request->eos);
-       assert_pdu_count(1, request);
-
-       pdu = STAILQ_FIRST(&request->pdus);
-       er = &pdu->obj.er;
-
-       ck_assert_uint_eq(er->header.version, RTR_V1);
-       ck_assert_uint_eq(er->header.type, PDU_TYPE_ERROR_REPORT);
-       ck_assert_uint_eq(er->header.m.reserved, 0x1617);
-       ck_assert_uint_eq(er->header.length, 33);
-       ck_assert_uint_eq(er->errpdu_len, 12);
-       ck_assert_uint_eq(er->errpdu[0], 1);
-       ck_assert_uint_eq(er->errpdu[1], 0);
-       ck_assert_uint_eq(er->errpdu[2], 2);
-       ck_assert_uint_eq(er->errpdu[3], 3);
-       ck_assert_uint_eq(er->errpdu[4], 0);
-       ck_assert_uint_eq(er->errpdu[5], 0);
-       ck_assert_uint_eq(er->errpdu[6], 0);
-       ck_assert_uint_eq(er->errpdu[7], 12);
-       ck_assert_uint_eq(er->errpdu[8], 1);
-       ck_assert_uint_eq(er->errpdu[9], 2);
-       ck_assert_uint_eq(er->errpdu[10], 3);
-       ck_assert_uint_eq(er->errpdu[11], 4);
-       ck_assert_uint_eq(er->errmsg_len, 5);
-       ck_assert_str_eq(er->errmsg, "hello");
-
-       ck_assert_uint_eq(33, pdu->raw.bytes_len);
-       ck_assert(memcmp(input, pdu->raw.bytes, 33) == 0);
-
-       rtreq_destroy(request);
+       ck_assert_uint_eq(false, pdustream_next(stream, &request));
+       ck_assert_ptr_null(request);
        pdustream_destroy(&stream);
 }
 END_TEST
 
-#define ASSERT_RQ(_rq, _version, _type, _reserved, _length)            \
-       ck_assert_uint_eq(_rq.header.version, _version);                \
-       ck_assert_uint_eq(_rq.header.type, _type);                      \
-       ck_assert_uint_eq(_rq.header.m.reserved, _reserved);            \
-       ck_assert_uint_eq(_rq.header.length, _length);
+#define ASSERT_RQ(_req, _version)                                      \
+       ck_assert_uint_eq(_req->pdu.rtr_version, _version);             \
+       ck_assert_uint_eq(_req->pdu.type, PDU_TYPE_RESET_QUERY);
 
-#define ASSERT_SQ(_sq, _version, _type, _reserved, _length, _serial)   \
-       ck_assert_uint_eq(_sq.header.version, _version);                \
-       ck_assert_uint_eq(_sq.header.type, _type);                      \
-       ck_assert_uint_eq(_sq.header.m.reserved, _reserved);            \
-       ck_assert_uint_eq(_sq.header.length, _length);                  \
-       ck_assert_uint_eq(_sq.serial_number, _serial);
+#define ASSERT_SQ(_req, _version, _type, _session, _serial)            \
+       ck_assert_uint_eq(_req->pdu.rtr_version, _version);             \
+       ck_assert_uint_eq(_req->pdu.type, PDU_TYPE_SERIAL_QUERY);       \
+       ck_assert_uint_eq(_req->pdu.obj.sq.session_id, _session);       \
+       ck_assert_uint_eq(_req->pdu.obj.sq.serial_number, _serial);
 
 START_TEST(test_multiple_pdus)
 {
@@ -241,7 +182,6 @@ START_TEST(test_multiple_pdus)
        };
        struct pdu_stream *stream;
        struct rtr_request *request;
-       struct rtr_pdu *pdu;
        int pipes[2];
 
        setup_pipes(pipes);
@@ -251,64 +191,36 @@ START_TEST(test_multiple_pdus)
        /* Input 1 */
 
        ck_assert_int_eq(32, write(pipes[1], input1, sizeof(input1)));
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
+       ck_assert_uint_eq(true, pdustream_next(stream, &request));
 
        ck_assert_int_eq(stream->fd, request->fd);
        ck_assert_str_eq(stream->addr, request->client_addr);
+       ASSERT_RQ(request, RTR_V1);
+       ck_assert_uint_eq(8, request->pdu.raw.bytes_len);
+       ck_assert(memcmp(input1 + 20, request->pdu.raw.bytes, 8) == 0);
        ck_assert_uint_eq(0, request->eos);
-       assert_pdu_count(3, request);
-
-       pdu = STAILQ_FIRST(&request->pdus);
-       ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0, 8);
-       ck_assert_uint_eq(8, pdu->raw.bytes_len);
-       ck_assert(memcmp(input1 + 0, pdu->raw.bytes, 8) == 0);
-
-       pdu = STAILQ_NEXT(pdu, hook);
-       ASSERT_SQ(pdu->obj.sq, RTR_V1, PDU_TYPE_SERIAL_QUERY, 0, 12, 0x1020304);
-       ck_assert_uint_eq(12, pdu->raw.bytes_len);
-       ck_assert(memcmp(input1 + 8, pdu->raw.bytes, 12) == 0);
-
-       pdu = STAILQ_NEXT(pdu, hook);
-       ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0x304, 8);
-       ck_assert_uint_eq(8, pdu->raw.bytes_len);
-       ck_assert(memcmp(input1 + 20, pdu->raw.bytes, 8) == 0);
 
        rtreq_destroy(request);
 
        /* Input 2 */
 
        ck_assert_int_eq(12, write(pipes[1], input2, sizeof(input2)));
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
+       ck_assert_uint_eq(true, pdustream_next(stream, &request));
 
        ck_assert_int_eq(stream->fd, request->fd);
        ck_assert_str_eq(stream->addr, request->client_addr);
+       ASSERT_RQ(request, RTR_V1);
+       ck_assert_uint_eq(8, request->pdu.raw.bytes_len);
+       ck_assert(memcmp(input2 + 4,  request->pdu.raw.bytes, 8) == 0);
        ck_assert_uint_eq(0, request->eos);
-       assert_pdu_count(2, request);
-
-       pdu = STAILQ_FIRST(&request->pdus);
-       ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0x304, 8);
-       ck_assert_uint_eq(8, pdu->raw.bytes_len);
-       ck_assert(memcmp(input1 + 28, &pdu->raw.bytes[0], 4) == 0);
-       ck_assert(memcmp(input2 + 0,  &pdu->raw.bytes[4], 4) == 0);
-
-       pdu = STAILQ_NEXT(pdu, hook);
-       ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0x607, 8);
-       ck_assert_uint_eq(8, pdu->raw.bytes_len);
-       ck_assert(memcmp(input2 + 4, pdu->raw.bytes, 8) == 0);
 
        rtreq_destroy(request);
 
        /* Input 3 */
 
        close(pipes[1]);
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
-
-       ck_assert_int_eq(stream->fd, request->fd);
-       ck_assert_str_eq(stream->addr, request->client_addr);
-       ck_assert_uint_eq(1, request->eos);
-       assert_pdu_count(0, request);
-
-       rtreq_destroy(request);
+       ck_assert_uint_eq(false, pdustream_next(stream, &request));
+       ck_assert_ptr_null(request);
 
        /* Clean up */
 
@@ -323,14 +235,10 @@ START_TEST(test_interrupted)
        struct rtr_request *request;
 
        stream = create_stream_fd(input, sizeof(input), RTR_V1);
-       ck_assert_int_eq(0, pdustream_next(stream, &request));
 
-       ck_assert_int_eq(stream->fd, request->fd);
-       ck_assert_str_eq(stream->addr, request->client_addr);
-       ck_assert_uint_eq(1, request->eos);
-       assert_pdu_count(0, request);
+       ck_assert_uint_eq(false, pdustream_next(stream, &request));
+       ck_assert_ptr_null(request);
 
-       rtreq_destroy(request);
        pdustream_destroy(&stream);
 }
 END_TEST