From: Alberto Leiva Popper Date: Sat, 21 Oct 2023 03:08:46 +0000 (-0600) Subject: Remove PDU queuing X-Git-Tag: 1.6.0~39 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=0da59d8a8f2c77ef64bd74b5774f832737dd6d5d;p=thirdparty%2FFORT-validator.git Remove PDU queuing Tweak ideated during the commit message of the previous commit. - If the read() yields at least one Error Report, drop the connection. This is because all the server-received error codes currently defined are supposed to result in immediate connection termination. If a future RFC defines a nonfatal error code, Error Reports should probably be downgraded to the 'last PDU' rule below. - Otherwise, if a read() yields multile PDUs, drop all except for the last one. Since it's what the client is most likely expecting, I guess. Serial Queries and Reset Queries are alternate means to achieve the same goal, so it doesn't make sense to queue them. --- diff --git a/src/rtr/pdu.h b/src/rtr/pdu.h index b93947e5..f8e42e7d 100644 --- a/src/rtr/pdu.h +++ b/src/rtr/pdu.h @@ -62,34 +62,6 @@ char const *pdutype2str(enum pdu_type); #define RTRPDU_MAX_LEN2 RTRPDU_ERROR_REPORT_MAX_LEN -struct pdu_header { - enum rtr_version version; - enum pdu_type type; - union { - uint16_t session_id; - uint16_t reserved; - uint16_t error_code; - } m; /* Note: "m" stands for "meh." I have no idea what to call this. */ - uint32_t length; -}; - -struct serial_query_pdu { - struct pdu_header header; - uint32_t serial_number; -}; - -struct reset_query_pdu { - struct pdu_header header; -}; - -struct error_report_pdu { - struct pdu_header header; - uint32_t errpdu_len; - unsigned char errpdu[RTRPDU_MAX_LEN]; - uint32_t errmsg_len; - char *errmsg; -}; - static inline size_t rtrpdu_error_report_len(uint32_t errpdu_len, uint32_t errmsg_len) { diff --git a/src/rtr/pdu_handler.c b/src/rtr/pdu_handler.c index e9277b18..b8014a46 100644 --- a/src/rtr/pdu_handler.c +++ b/src/rtr/pdu_handler.c @@ -56,16 +56,14 @@ send_delta_rk(struct delta_router_key const *delta, void *arg) } int -handle_serial_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu) +handle_serial_query_pdu(struct rtr_request *request) { - struct serial_query_pdu *sq; struct send_delta_args args; serial_t final_serial; int error; - sq = &pdu->obj.sq; args.fd = request->fd; - args.rtr_version = sq->header.version; + args.rtr_version = request->pdu.rtr_version; args.cache_response_sent = false; /* @@ -75,9 +73,9 @@ handle_serial_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu) * the mismatch MUST immediately terminate the session with an Error * Report PDU with code 0 ("Corrupt Data")" */ - if (sq->header.m.session_id != get_current_session_id(args.rtr_version)) + if (request->pdu.obj.sq.session_id != get_current_session_id(args.rtr_version)) return err_pdu_send_corrupt_data(args.fd, args.rtr_version, - &pdu->raw, "Session ID doesn't match."); + &request->pdu.raw, "Session ID doesn't match."); /* * For the record, there are two reasons why we want to work on a @@ -88,8 +86,8 @@ handle_serial_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu) * PDUs, to minimize writer stagnation. */ - error = vrps_foreach_delta_since(sq->serial_number, &final_serial, - send_delta_vrp, send_delta_rk, &args); + error = vrps_foreach_delta_since(request->pdu.obj.sq.serial_number, + &final_serial, send_delta_vrp, send_delta_rk, &args); switch (error) { case 0: /* @@ -164,7 +162,7 @@ send_base_router_key(struct router_key const *key, void *arg) } int -handle_reset_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu) +handle_reset_query_pdu(struct rtr_request *request) { struct base_roa_args args; serial_t current_serial; @@ -172,7 +170,7 @@ handle_reset_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu) args.started = false; args.fd = request->fd; - args.version = pdu->obj.hdr.version; + args.version = request->pdu.rtr_version; error = get_last_serial_number(¤t_serial); switch (error) { @@ -216,23 +214,3 @@ handle_reset_query_pdu(struct rtr_request *request, struct rtr_pdu *pdu) return send_end_of_data_pdu(args.fd, args.version, current_serial); } - -int -handle_error_report_pdu(struct rtr_request *request, struct rtr_pdu *pdu) -{ - struct error_report_pdu *er; - char const *error_name; - - er = &pdu->obj.er; - error_name = err_pdu_to_string(er->header.m.error_code); - - if (er->errmsg != NULL) { - pr_op_info("RTR client %s responded with error PDU '%s' ('%s'). Closing socket.", - request->client_addr, error_name, er->errmsg); - } else { - pr_op_info("RTR client %s responded with error PDU '%s'. Closing socket.", - request->client_addr, error_name); - } - - return -EINVAL; -} diff --git a/src/rtr/pdu_handler.h b/src/rtr/pdu_handler.h index 83b39b01..406827a5 100644 --- a/src/rtr/pdu_handler.h +++ b/src/rtr/pdu_handler.h @@ -3,8 +3,7 @@ #include "rtr/pdu_stream.h" -int handle_serial_query_pdu(struct rtr_request *, struct rtr_pdu *); -int handle_reset_query_pdu(struct rtr_request *, struct rtr_pdu *); -int handle_error_report_pdu(struct rtr_request *, struct rtr_pdu *); +int handle_serial_query_pdu(struct rtr_request *); +int handle_reset_query_pdu(struct rtr_request *); #endif /* SRC_RTR_PDU_HANDLER_H_ */ diff --git a/src/rtr/pdu_sender.c b/src/rtr/pdu_sender.c index 9c9424dc..41d347a9 100644 --- a/src/rtr/pdu_sender.c +++ b/src/rtr/pdu_sender.c @@ -173,7 +173,7 @@ send_router_key_pdu(int fd, uint8_t version, int send_end_of_data_pdu(int fd, uint8_t version, serial_t end_serial) { - static const uint8_t type = PDU_TYPE_ROUTER_KEY; + static const uint8_t type = PDU_TYPE_END_OF_DATA; unsigned char data[ MAX(RTRPDU_END_OF_DATA_V1_LEN, RTRPDU_END_OF_DATA_V0_LEN) ]; @@ -228,7 +228,7 @@ send_error_report_pdu(int fd, uint8_t version, uint16_t code, len = rtrpdu_error_report_len(error_pdu_len, error_msg_len); data = pmalloc(len); - buf = serialize_hdr(data, version, type, 0, len); + buf = serialize_hdr(data, version, type, code, len); buf = write_uint32(buf, error_pdu_len); if (error_pdu_len > 0) { memcpy(buf, request->bytes, error_pdu_len); diff --git a/src/rtr/pdu_stream.c b/src/rtr/pdu_stream.c index b2fb6090..066ad7ce 100644 --- a/src/rtr/pdu_stream.c +++ b/src/rtr/pdu_stream.c @@ -31,6 +31,17 @@ struct pdu_stream { unsigned char *end; }; +struct pdu_header { + enum rtr_version version; + enum pdu_type type; + union { + uint16_t session_id; + uint16_t reserved; + uint16_t error_code; + } m; /* Note: "m" stands for "meh." I have no idea what to call this. */ + uint32_t len; +}; + struct pdu_stream *pdustream_create(int fd, char const *addr) { struct pdu_stream *result; @@ -88,12 +99,14 @@ update_buffer(struct pdu_stream *in /* "in"put stream */) consumed = read(in->fd, in->end, RTRPDU_MAX_LEN2 - get_length(in)); if (consumed == -1) { error = errno; - if (error == EAGAIN || error == EWOULDBLOCK) + if (error == EAGAIN || error == EWOULDBLOCK) { + pr_op_debug("Reached stream limit for now."); return BS_WOULD_BLOCK; - - pr_op_err("Client socket read interrupted: %s", - strerror(error)); - return BS_ERROR; + } else { + pr_op_err("Client socket read interrupted: %s", + strerror(error)); + return BS_ERROR; + } } if (consumed == 0) { @@ -107,6 +120,7 @@ update_buffer(struct pdu_stream *in /* "in"put stream */) * big PDU that either lengths exactly RTRPDU_MAX_LEN2, or is too big * for us to to allow it. */ + pr_op_debug("Stream limit not reached yet."); return BS_KEEP_READING; } @@ -217,7 +231,7 @@ read_hdr(struct pdu_stream *stream, struct pdu_header *header) header->version = stream->start[0]; header->type = stream->start[1]; header->m.reserved = read_uint16(stream->start + 2); - header->length = read_uint32(stream->start + 4); + header->len = read_uint32(stream->start + 4); } static int @@ -273,124 +287,154 @@ unexpected: static int load_serial_query(struct pdu_stream *stream, struct pdu_header *hdr, - struct rtr_pdu *result) + struct rtr_request *result) { - if (hdr->length != RTRPDU_SERIAL_QUERY_LEN) { + size_t length; + + if (hdr->len != RTRPDU_SERIAL_QUERY_LEN) { + pr_op_err("%s: Header length is not %u: %u", + stream->addr, RTRPDU_SERIAL_QUERY_LEN, hdr->len); return err_pdu_send_invalid_request( - stream->fd, stream->rtr_version, &result->raw, + stream->fd, stream->rtr_version, &result->pdu.raw, "Expected length 12 for Serial Query PDUs." ); } - if (get_length(stream) < RTRPDU_SERIAL_QUERY_LEN) + + length = get_length(stream); + if (length < RTRPDU_SERIAL_QUERY_LEN) { + pr_op_debug("PDU fragmented after hdr (%zu)", length); return EAGAIN; + } pr_op_debug("Received a Serial Query from %s.", stream->addr); - memcpy(&result->obj.sq.header, hdr, sizeof(*hdr)); + result->pdu.obj.sq.session_id = hdr->m.session_id; stream->start += RTR_HDR_LEN; - result->obj.sq.serial_number = read_uint32(stream->start); + result->pdu.obj.sq.serial_number = read_uint32(stream->start); stream->start += 4; - return 0; } static int load_reset_query(struct pdu_stream *stream, struct pdu_header *hdr, - struct rtr_pdu *result) + struct rtr_request *result) { - if (hdr->length != RTRPDU_RESET_QUERY_LEN) { + size_t length; + + if (hdr->len != RTRPDU_RESET_QUERY_LEN) { + pr_op_err("%s: Header length is not %u: %u", + stream->addr, RTRPDU_RESET_QUERY_LEN, hdr->len); return err_pdu_send_invalid_request( - stream->fd, stream->rtr_version, &result->raw, + stream->fd, stream->rtr_version, &result->pdu.raw, "Expected length 8 for Reset Query PDUs." ); } - if (get_length(stream) < RTRPDU_RESET_QUERY_LEN) + + length = get_length(stream); + if (length < RTRPDU_RESET_QUERY_LEN) { + pr_op_debug("PDU fragmented after hdr (%zu)", length); return EAGAIN; + } pr_op_debug("Received a Reset Query from %s.", stream->addr); - memcpy(&result->obj.rq.header, hdr, sizeof(*hdr)); stream->start += RTR_HDR_LEN; - return 0; } +static void +handle_error_report_pdu(uint16_t errcode, char const *errmsg, + char const *client_addr) +{ + if (errmsg != NULL) { + pr_op_err("RTR client %s responded with error PDU '%s' ('%s'). Closing socket.", + client_addr, err_pdu_to_string(errcode), errmsg); + } else { + pr_op_err("RTR client %s responded with error PDU '%s'. Closing socket.", + client_addr, err_pdu_to_string(errcode)); + } +} + + static int -load_error_report(struct pdu_stream *stream, struct pdu_header *hdr, - struct rtr_pdu *result) +load_error_report(struct pdu_stream *stream, struct pdu_header *hdr) { - struct error_report_pdu *pdu; + uint32_t errpdu_len; + uint32_t errmsg_len; + char *errmsg; int error; - if (hdr->length > RTRPDU_ERROR_REPORT_MAX_LEN) { + if (hdr->len > RTRPDU_ERROR_REPORT_MAX_LEN) { return pr_op_err( "RTR client %s sent a large Error Report PDU (%u bytes). This looks broken, so I'm dropping the connection.", - stream->addr, hdr->length + stream->addr, hdr->len ); } pr_op_debug("Received an Error Report from %s.", stream->addr); - pdu = &result->obj.er; - /* Header */ - memcpy(&pdu->header, hdr, sizeof(*hdr)); stream->start += RTR_HDR_LEN; /* Error PDU length */ if (get_length(stream) < 4) { + pr_op_debug("Fragmented on error PDU length."); error = EAGAIN; goto revert_hdr; } - pdu->errpdu_len = read_uint32(stream->start); + errpdu_len = read_uint32(stream->start); stream->start += 4; - if (pdu->errpdu_len > RTRPDU_MAX_LEN) { + if (errpdu_len > RTRPDU_MAX_LEN) { /* * We truncate PDUs larger than RTRPDU_MAX_LEN, so we couldn't * have sent this PDU. Looks like someone is messing with us. */ error = pr_op_err( "RTR client %s sent an Error Report PDU containing a large error PDU (%u bytes). This looks broken/insecure; I'm dropping the connection.", - stream->addr, pdu->errpdu_len + stream->addr, errpdu_len ); goto revert_errpdu_len; } /* Error PDU */ - if (get_length(stream) < pdu->errpdu_len) { + if (get_length(stream) < errpdu_len) { + pr_op_debug("Fragmented on error PDU."); error = EAGAIN; goto revert_errpdu_len; } - memcpy(pdu->errpdu, stream->start, pdu->errpdu_len); - stream->start += pdu->errpdu_len; + stream->start += errpdu_len; /* Skip it for now; we don't use it */ /* Error msg length */ if (get_length(stream) < 4) { + pr_op_debug("Fragmented on error message length."); error = EAGAIN; goto revert_errpdu; } - pdu->errmsg_len = read_uint32(stream->start); + errmsg_len = read_uint32(stream->start); stream->start += 4; - if (hdr->length != rtrpdu_error_report_len(pdu->errpdu_len, pdu->errmsg_len)) { + if (hdr->len != rtrpdu_error_report_len(errpdu_len, errmsg_len)) { error = pr_op_err( "RTR client %s sent a malformed Error Report PDU; header length is %u, but effective length is %u + %u + %u + %u + %u.", - stream->addr, hdr->length, - RTR_HDR_LEN, 4, pdu->errpdu_len, 4, pdu->errmsg_len + stream->addr, hdr->len, + RTR_HDR_LEN, 4, errpdu_len, 4, errmsg_len ); goto revert_errmsg_len; } /* Error msg */ - pdu->errmsg = read_string(stream, pdu->errmsg_len); - stream->start += pdu->errmsg_len; + errmsg = read_string(stream, errmsg_len); + stream->start += errmsg_len; - return 0; + handle_error_report_pdu(hdr->m.error_code, errmsg, stream->addr); + + free(errmsg); + return EINVAL; revert_errmsg_len: stream->start -= 4; revert_errpdu: - stream->start -= pdu->errpdu_len; + stream->start -= errpdu_len; revert_errpdu_len: stream->start -= 4; revert_hdr: @@ -398,119 +442,162 @@ revert_hdr: return error; } +static struct rtr_request * +create_request(struct pdu_stream *stream, struct pdu_header *hdr, + struct rtr_buffer *raw) +{ + struct rtr_request *result; + + result = pmalloc(sizeof(struct rtr_request)); + result->fd = stream->fd; + strcpy(result->client_addr, stream->addr); + result->pdu.rtr_version = hdr->version; + result->pdu.type = hdr->type; + result->pdu.raw = *raw; + result->eos = false; + + return result; +} + /* + * Returns the next "usable" PDU in the stream. Does not block. + * + * I call it "usable" because there can technically be multiple PDUs in the + * stream, and if that happens, it doesn't make sense to handle all of them + * sequentially. So there's no queuing. + * + * If there is at least one Error Report, it'll induce end of stream. This is + * because all the currently defined client-sourced Error Reports are fatal. + * The caller does not need to concern itself with handling Error Reports. + * + * Otherwise, if there are multiple PDUs in the stream, the last one will be + * returned, and the rest will be ignored. + * This is because Serial Queries and Reset Queries are the only non-error PDUs + * the server is supposed to receive, and they serve the same purpose. If the + * client sent two of them (maybe because we took too long to respond for some + * reason), it's most likely given up on the old one. + * + * Just to be clear, most of this is probably just paranoid error handling; a + * well-behaving client will never send us multiple PDUs in quick succession. + * But we don't know for sure how much buffering the underlying socket does, + * so we want to do something sensible if it happens. + * * Returns: - * == 0: Success; at least zero PDUs read. - * != 0: Communication broken; close the connection. + * true: Success. @result might or might not point to a PDU; check NULL. + * false: Communication ended or broken; close the connection, ignore @result. */ -int -pdustream_next(struct pdu_stream *stream, struct rtr_request **_result) +bool +pdustream_next(struct pdu_stream *stream, struct rtr_request **result) { enum buffer_state state; struct pdu_header hdr; - struct rtr_request *result; - struct rtr_pdu *pdu; + struct rtr_buffer raw = { 0 }; + struct rtr_request *request = NULL; + struct rtr_request *request_tmp; size_t remainder; int error; - result = pmalloc(sizeof(struct rtr_request)); - result->fd = stream->fd; - strcpy(result->client_addr, stream->addr); - STAILQ_INIT(&result->pdus); - result->eos = false; - - pdu = NULL; + *result = NULL; again: state = update_buffer(stream); - if (state == BS_ERROR) { - error = EINVAL; - goto fail; - } + if (state == BS_ERROR) + return false; while (stream->start < stream->end) { + request_tmp = NULL; remainder = get_length(stream); /* Read header. */ - if (remainder < RTR_HDR_LEN) - break; + if (remainder < RTR_HDR_LEN) { + pr_op_debug("PDU fragmented on header (%zu)", remainder); + break; /* PDU is fragmented */ + } read_hdr(stream, &hdr); /* Init raw PDU; Needed early because of error responses. */ - pdu = pzalloc(sizeof(struct rtr_pdu)); - pdu->raw.bytes_len = (hdr.length <= remainder) - ? hdr.length : remainder; - pdu->raw.bytes = pmalloc(pdu->raw.bytes_len); - memcpy(pdu->raw.bytes, stream->start, pdu->raw.bytes_len); + raw.bytes_len = (hdr.len <= remainder) ? hdr.len : remainder; + raw.bytes = pmalloc(raw.bytes_len); + memcpy(raw.bytes, stream->start, raw.bytes_len); /* Validate length; Needs raw. */ - if (hdr.length > RTRPDU_MAX_LEN2) { - error = err_pdu_send_invalid_request( + if (hdr.len > RTRPDU_MAX_LEN2) { + pr_op_err("%s: Header length too big: %u > %u", + stream->addr, hdr.len, RTRPDU_MAX_LEN2); + err_pdu_send_invalid_request( stream->fd, (stream->rtr_version != -1) ? stream->rtr_version : hdr.version, - &pdu->raw, + &raw, "PDU is too large." ); goto fail; } - if (remainder < hdr.length) { - free(pdu->raw.bytes); - free(pdu); - break; - } - /* Validate version; Needs raw. */ - error = validate_rtr_version(stream, &hdr, &pdu->raw); - if (error) + if (validate_rtr_version(stream, &hdr, &raw) != 0) { + pr_op_err("%s: Bad RTR version: %u", + stream->addr, hdr.version); goto fail; + } + + request_tmp = create_request(stream, &hdr, &raw); + raw.bytes = NULL; /* Ownership transferred */ switch (hdr.type) { case PDU_TYPE_SERIAL_QUERY: - error = load_serial_query(stream, &hdr, pdu); + error = load_serial_query(stream, &hdr, request_tmp); break; case PDU_TYPE_RESET_QUERY: - error = load_reset_query(stream, &hdr, pdu); + error = load_reset_query(stream, &hdr, request_tmp); break; case PDU_TYPE_ERROR_REPORT: - error = load_error_report(stream, &hdr, pdu); + error = load_error_report(stream, &hdr); break; default: + pr_op_err("%s: Unknown PDU type: %u", + stream->addr, hdr.version); err_pdu_send_unsupported_pdu_type(stream->fd, - stream->rtr_version, &pdu->raw); - error = ENOTSUP; + stream->rtr_version, &request_tmp->pdu.raw); + goto fail; } - if (error) + if (error == EAGAIN) { + rtreq_destroy(request_tmp); + break; + } else if (error) { goto fail; + } - STAILQ_INSERT_TAIL(&result->pdus, pdu, hook); + if (request != NULL) + rtreq_destroy(request); + request = request_tmp; } - *_result = result; - switch (state) { - case BS_WOULD_BLOCK: - result->eos = false; - return 0; case BS_EOS: - result->eos = true; - return 0; + if (request == NULL) + return false; + request->eos = true; + /* Fall through */ + case BS_WOULD_BLOCK: + *result = request; + return true; case BS_KEEP_READING: goto again; - default: - error = EINVAL; + case BS_ERROR: + pr_crit("This should have been catched earlier."); } fail: - if (pdu != NULL) { - free(pdu->raw.bytes); - free(pdu); - } - rtreq_destroy(result); - return error; + if (request != NULL) + rtreq_destroy(request); + if (request_tmp != NULL) + rtreq_destroy(request_tmp); + if (raw.bytes != NULL) + free(raw.bytes); + return false; } int @@ -534,16 +621,7 @@ pdustream_version(struct pdu_stream *stream) void rtreq_destroy(struct rtr_request *request) { - struct rtr_pdu *pdu; - - while (!STAILQ_EMPTY(&request->pdus)) { - pdu = STAILQ_FIRST(&request->pdus); - STAILQ_REMOVE_HEAD(&request->pdus, hook); - - if (pdu->obj.hdr.type == PDU_TYPE_ERROR_REPORT) - free(pdu->obj.er.errmsg); - free(pdu->raw.bytes); - free(pdu); - } + free(request->pdu.raw.bytes); + free(request); } diff --git a/src/rtr/pdu_stream.h b/src/rtr/pdu_stream.h index db66ea23..e0f279e9 100644 --- a/src/rtr/pdu_stream.h +++ b/src/rtr/pdu_stream.h @@ -1,41 +1,34 @@ #ifndef SRC_RTR_PDU_STREAM_H_ #define SRC_RTR_PDU_STREAM_H_ -#include - #include "rtr/pdu.h" #include "rtr/rtr.h" #include "data_structure/array_list.h" struct pdu_stream; /* It's an *input* stream. */ -struct rtr_pdu { - /* Deserialized version */ - union { - struct pdu_header hdr; - struct serial_query_pdu sq; - struct reset_query_pdu rq; - struct error_report_pdu er; - } obj; - - /* - * Serialized version. - * Can be truncated; use for responding errors only. - */ - struct rtr_buffer raw; - - STAILQ_ENTRY(rtr_pdu) hook; -}; - struct rtr_request { int fd; char client_addr[INET6_ADDRSTRLEN]; - /* - * It's not sensible for a request to contain multiple PDUs, - * but I don't know how much buffering the underlying socket has. - */ - STAILQ_HEAD(, rtr_pdu) pdus; + struct { + enum rtr_version rtr_version; + enum pdu_type type; + + /* Deserialized version */ + union { + struct { + uint16_t session_id; + uint32_t serial_number; + } sq; /* Serial Query */ + } obj; + + /* + * Serialized version. + * Can be truncated; use for responding errors only. + */ + struct rtr_buffer raw; + } pdu; bool eos; /* end of stream */ }; @@ -43,7 +36,7 @@ struct rtr_request { struct pdu_stream *pdustream_create(int, char const *); void pdustream_destroy(struct pdu_stream **); -int pdustream_next(struct pdu_stream *, struct rtr_request **); +bool pdustream_next(struct pdu_stream *, struct rtr_request **); int pdustream_fd(struct pdu_stream *); char const *pdustream_addr(struct pdu_stream *); int pdustream_version(struct pdu_stream *); diff --git a/src/rtr/rtr.c b/src/rtr/rtr.c index 908b9cf0..05b1fc04 100644 --- a/src/rtr/rtr.c +++ b/src/rtr/rtr.c @@ -318,27 +318,21 @@ static void handle_client_request(void *arg) { struct rtr_request *request = arg; - struct rtr_pdu *pdu; - STAILQ_FOREACH(pdu, &request->pdus, hook) { - switch (pdu->obj.hdr.type) { - case PDU_TYPE_SERIAL_QUERY: - handle_serial_query_pdu(request, pdu); - break; - case PDU_TYPE_RESET_QUERY: - handle_reset_query_pdu(request, pdu); - break; - case PDU_TYPE_ERROR_REPORT: - handle_error_report_pdu(request, pdu); - break; - default: - /* Should have been catched during constructor */ - pr_crit("Unexpected PDU type: %u", pdu->obj.hdr.type); - } + switch (request->pdu.type) { + case PDU_TYPE_SERIAL_QUERY: + handle_serial_query_pdu(request); + break; + case PDU_TYPE_RESET_QUERY: + handle_reset_query_pdu(request); + break; + default: + /* Should have been catched during constructor */ + pr_crit("Unexpected PDU type: %u", request->pdu.type); } if (request->eos) - /* Wake poller to close the socket. Read side already shut. */ + /* Wake poller to close the socket */ shutdown(request->fd, SHUT_WR); rtreq_destroy(request); @@ -442,16 +436,12 @@ static bool __handle_client_request(struct pdu_stream *stream) { struct rtr_request *request; - bool eos; - if (pdustream_next(stream, &request) != 0) + if (!pdustream_next(stream, &request)) return false; - if (STAILQ_EMPTY(&request->pdus)) { - eos = request->eos; - free(request); - return !eos; - } + if (request == NULL) + return true; thread_pool_push(request_handlers, "RTR request", handle_client_request, request); @@ -563,7 +553,7 @@ fddb_poll(void) } } - /* The servers might change this number, so store a backup. */ + /* accept_new_client() might change this number, so store a backup. */ nclients = clients.len; /* New connections */ @@ -599,9 +589,6 @@ fddb_poll(void) /* PR_DEBUG_MSG("Client %u: fd:%d revents:%x", i, fd->fd, fd->revents); */ -// if (fd->fd == -1) -// continue; - if (fd->revents & (POLLHUP | POLLERR | POLLNVAL)) { fd->fd = -1; } else if (fd->revents & POLLIN) { diff --git a/test/rtr/pdu_handler_test.c b/test/rtr/pdu_handler_test.c index 48b9c921..e092ee60 100644 --- a/test/rtr/pdu_handler_test.c +++ b/test/rtr/pdu_handler_test.c @@ -105,43 +105,33 @@ init_db_full(void) } static void -init_request(struct rtr_request *request, struct rtr_pdu *pdu) -{ - request->fd = 0; - strcpy(request->client_addr, "192.0.2.1"); - STAILQ_INIT(&request->pdus); - if (pdu != NULL) - STAILQ_INSERT_TAIL(&request->pdus, pdu, hook); - request->eos = false; -} - -static void -init_reset_query(struct rtr_pdu *pdu) +init_reset_query(struct rtr_request *request) { static unsigned char raw[] = { 1, 2, 0, 0, 0, 0, 0, 8 }; - pdu->obj.rq.header.version = RTR_V1; - pdu->obj.rq.header.type = PDU_TYPE_RESET_QUERY; - pdu->obj.rq.header.m.reserved = 0; - pdu->obj.rq.header.length = 8; - pdu->raw.bytes = raw; - pdu->raw.bytes_len = sizeof(raw); - memset(&pdu->hook, 0, sizeof(pdu->hook)); + request->fd = 0; + strcpy(request->client_addr, "192.0.2.1"); + request->pdu.rtr_version = RTR_V1; + request->pdu.type = PDU_TYPE_RESET_QUERY; + request->pdu.raw.bytes = raw; + request->pdu.raw.bytes_len = sizeof(raw); + request->eos = true; } static void -init_serial_query(struct rtr_pdu *pdu, uint32_t serial) +init_serial_query(struct rtr_request *request, uint32_t serial) { static unsigned char raw[] = { 1, 1, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0 }; - pdu->obj.sq.header.version = RTR_V1; - pdu->obj.sq.header.type = PDU_TYPE_SERIAL_QUERY; - pdu->obj.sq.header.m.session_id = get_current_session_id(RTR_V1); - pdu->obj.sq.header.length = 12; - pdu->obj.sq.serial_number = serial; - pdu->raw.bytes = raw; - pdu->raw.bytes_len = sizeof(raw); - memset(&pdu->hook, 0, sizeof(pdu->hook)); + request->fd = 0; + strcpy(request->client_addr, "192.0.2.1"); + request->pdu.rtr_version = RTR_V1; + request->pdu.type = PDU_TYPE_SERIAL_QUERY; + request->pdu.obj.sq.session_id = get_current_session_id(RTR_V1); + request->pdu.obj.sq.serial_number = serial; + request->pdu.raw.bytes = raw; + request->pdu.raw.bytes_len = sizeof(raw); + request->eos = true; } /* Mocks */ @@ -253,14 +243,12 @@ send_error_report_pdu(int fd, uint8_t version, uint16_t code, START_TEST(test_start_or_restart) { struct rtr_request request; - struct rtr_pdu client_pdu; pr_op_info("-- Start or Restart --"); /* Init */ init_db_full(); - init_reset_query(&client_pdu); - init_request(&request, &client_pdu); + init_reset_query(&request); /* Define expected server response */ expected_pdu_add(PDU_TYPE_CACHE_RESPONSE); @@ -270,7 +258,7 @@ START_TEST(test_start_or_restart) expected_pdu_add(PDU_TYPE_END_OF_DATA); /* Run and validate */ - ck_assert_int_eq(0, handle_reset_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(0, handle_reset_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* Clean up */ @@ -282,25 +270,23 @@ END_TEST START_TEST(test_typical_exchange) { struct rtr_request request; - struct rtr_pdu client_pdu; pr_op_info("-- Typical Exchange --"); /* Init */ init_db_full(); - init_serial_query(&client_pdu, 0); - init_request(&request, &client_pdu); + init_serial_query(&request, 0); /* From serial 0: Define expected server response */ /* Server doesn't have serial 0. */ expected_pdu_add(PDU_TYPE_CACHE_RESET); /* From serial 0: Run and validate */ - ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(0, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* From serial 1: Init client request */ - init_serial_query(&client_pdu, 1); + init_serial_query(&request, 1); /* From serial 1: Define expected server response */ expected_pdu_add(PDU_TYPE_CACHE_RESPONSE); @@ -313,11 +299,11 @@ START_TEST(test_typical_exchange) expected_pdu_add(PDU_TYPE_END_OF_DATA); /* From serial 1: Run and validate */ - ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(0, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* From serial 2: Init client request */ - init_serial_query(&client_pdu, 2); + init_serial_query(&request, 2); /* From serial 2: Define expected server response */ expected_pdu_add(PDU_TYPE_CACHE_RESPONSE); @@ -327,18 +313,18 @@ START_TEST(test_typical_exchange) expected_pdu_add(PDU_TYPE_END_OF_DATA); /* From serial 2: Run and validate */ - ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(0, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* From serial 3: Init client request */ - init_serial_query(&client_pdu, 3); + init_serial_query(&request, 3); /* From serial 3: Define expected server response */ expected_pdu_add(PDU_TYPE_CACHE_RESPONSE); expected_pdu_add(PDU_TYPE_END_OF_DATA); /* From serial 3: Run and validate */ - ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(0, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* Clean up */ @@ -350,20 +336,18 @@ END_TEST START_TEST(test_no_incremental_update_available) { struct rtr_request request; - struct rtr_pdu client_pdu; pr_op_info("-- No Incremental Update Available --"); /* Init */ init_db_full(); - init_serial_query(&client_pdu, 10000); - init_request(&request, &client_pdu); + init_serial_query(&request, 10000); /* Define expected server response */ expected_pdu_add(PDU_TYPE_CACHE_RESET); /* Run and validate */ - ck_assert_int_eq(0, handle_serial_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(0, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* The Reset Query is already tested in start_or_restart. */ @@ -377,38 +361,31 @@ END_TEST START_TEST(test_cache_has_no_data_available) { struct rtr_request request; - struct rtr_pdu serial_query; - struct rtr_pdu reset_query; pr_op_info("-- Cache Has No Data Available --"); /* Init */ ck_assert_int_eq(0, vrps_init()); - init_request(&request, NULL); /* Serial Query: Init client request */ - init_serial_query(&serial_query, 0); - STAILQ_INSERT_TAIL(&request.pdus, &serial_query, hook); + init_serial_query(&request, 0); /* Serial Query: Define expected server response */ expected_pdu_add(PDU_TYPE_ERROR_REPORT); /* Serial Query: Run and validate */ - ck_assert_int_eq(0, handle_serial_query_pdu(&request, &serial_query)); + ck_assert_int_eq(0, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); - STAILQ_REMOVE_HEAD(&request.pdus, hook); /* Reset Query: Init client request */ - init_reset_query(&reset_query); - STAILQ_INSERT_TAIL(&request.pdus, &reset_query, hook); + init_reset_query(&request); /* Reset Query: Define expected server response */ expected_pdu_add(PDU_TYPE_ERROR_REPORT); /* Reset Query: Run and validate */ - ck_assert_int_eq(0, handle_reset_query_pdu(&request, &reset_query)); + ck_assert_int_eq(0, handle_reset_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); - STAILQ_REMOVE_HEAD(&request.pdus, hook); /* Clean up */ vrps_destroy(); @@ -418,21 +395,19 @@ END_TEST START_TEST(test_bad_session_id) { struct rtr_request request; - struct rtr_pdu client_pdu; pr_op_info("-- Bad Session ID --"); /* Init */ init_db_full(); - init_serial_query(&client_pdu, 0); - client_pdu.obj.sq.header.m.session_id++; - init_request(&request, &client_pdu); + init_serial_query(&request, 0); + request.pdu.obj.sq.session_id++; /* From serial 0: Define expected server response */ expected_pdu_add(PDU_TYPE_ERROR_REPORT); /* From serial 0: Run and validate */ - ck_assert_int_eq(-EINVAL, handle_serial_query_pdu(&request, &client_pdu)); + ck_assert_int_eq(-EINVAL, handle_serial_query_pdu(&request)); ck_assert_uint_eq(false, has_expected_pdus()); /* Clean up */ diff --git a/test/rtr/pdu_stream_test.c b/test/rtr/pdu_stream_test.c index a3b52713..073479f1 100644 --- a/test/rtr/pdu_stream_test.c +++ b/test/rtr/pdu_stream_test.c @@ -17,6 +17,13 @@ MOCK_ABORT_INT(err_pdu_send_unsupported_pdu_type, int fd, uint8_t version, MOCK_ABORT_INT(err_pdu_send_unexpected_proto_version, int fd, uint8_t version, struct rtr_buffer const *request, char const *msg) +char const * +err_pdu_to_string(uint16_t c) +{ + ck_assert_uint_eq(c, 0x1617); + return "achoo"; +} + /* End of mocks */ static void @@ -54,18 +61,6 @@ create_stream_fd(unsigned char *data, size_t datalen, int rtr_version) return result; } -static void -assert_pdu_count(unsigned int expected, struct rtr_request *request) -{ - struct rtr_pdu *pdu; - unsigned int npdu; - - npdu = 0; - STAILQ_FOREACH(pdu, &request->pdus, hook) - npdu++; - ck_assert_uint_eq(expected, npdu); -} - START_TEST(test_pdu_header_from_stream) { unsigned char input[] = { 0, 1, 2, 3, 4, 5, 6, 7 }; @@ -78,7 +73,7 @@ START_TEST(test_pdu_header_from_stream) ck_assert_uint_eq(hdr.version, 0); ck_assert_uint_eq(hdr.type, 1); ck_assert_uint_eq(hdr.m.reserved, 0x0203); - ck_assert_uint_eq(hdr.length, 0x04050607); + ck_assert_uint_eq(hdr.len, 0x04050607); free(stream); } @@ -94,25 +89,19 @@ START_TEST(test_serial_query_from_stream) }; struct pdu_stream *stream; struct rtr_request *request; - struct rtr_pdu *pdu; - struct serial_query_pdu *sq; stream = create_stream_fd(input, sizeof(input), RTR_V1); - ck_assert_int_eq(0, pdustream_next(stream, &request)); + ck_assert_uint_eq(true, pdustream_next(stream, &request)); ck_assert_int_eq(stream->fd, request->fd); ck_assert_str_eq(stream->addr, request->client_addr); + ck_assert_uint_eq(RTR_V1, request->pdu.rtr_version); + ck_assert_uint_eq(PDU_TYPE_SERIAL_QUERY, request->pdu.type); + ck_assert_uint_eq(0x0e0f1011, request->pdu.obj.sq.serial_number); + ck_assert_uint_eq(0x0708, request->pdu.obj.sq.session_id); + ck_assert_uint_eq(12, request->pdu.raw.bytes_len); + ck_assert(memcmp(input, request->pdu.raw.bytes, 12) == 0); ck_assert_uint_eq(1, request->eos); - assert_pdu_count(1, request); - - pdu = STAILQ_FIRST(&request->pdus); - sq = &pdu->obj.sq; - - ck_assert_uint_eq(sq->header.version, RTR_V1); - ck_assert_uint_eq(sq->header.type, PDU_TYPE_SERIAL_QUERY); - ck_assert_uint_eq(sq->header.m.reserved, 0x0708); - ck_assert_uint_eq(sq->header.length, 12); - ck_assert_uint_eq(sq->serial_number, 0x0e0f1011); rtreq_destroy(request); pdustream_destroy(&stream); @@ -127,27 +116,16 @@ START_TEST(test_reset_query_from_stream) }; struct pdu_stream *stream; struct rtr_request *request; - struct rtr_pdu *pdu; - struct reset_query_pdu *rq; stream = create_stream_fd(input, sizeof(input), RTR_V0); - ck_assert_int_eq(0, pdustream_next(stream, &request)); - + ck_assert_uint_eq(true, pdustream_next(stream, &request)); ck_assert_int_eq(stream->fd, request->fd); ck_assert_str_eq(stream->addr, request->client_addr); + ck_assert_uint_eq(RTR_V0, request->pdu.rtr_version); + ck_assert_uint_eq(PDU_TYPE_RESET_QUERY, request->pdu.type); + ck_assert_uint_eq(8, request->pdu.raw.bytes_len); + ck_assert(memcmp(input, request->pdu.raw.bytes, 8) == 0); ck_assert_uint_eq(1, request->eos); - assert_pdu_count(1, request); - - pdu = STAILQ_FIRST(&request->pdus); - rq = &pdu->obj.rq; - - ck_assert_uint_eq(rq->header.version, RTR_V0); - ck_assert_uint_eq(rq->header.type, PDU_TYPE_RESET_QUERY); - ck_assert_uint_eq(rq->header.m.reserved, 0x0c0d); - ck_assert_uint_eq(rq->header.length, 8); - - ck_assert_uint_eq(8, pdu->raw.bytes_len); - ck_assert(memcmp(input, pdu->raw.bytes, 8) == 0); rtreq_destroy(request); pdustream_destroy(&stream); @@ -172,60 +150,23 @@ START_TEST(test_error_report_from_stream) }; struct pdu_stream *stream; struct rtr_request *request; - struct rtr_pdu *pdu; - struct error_report_pdu *er; stream = create_stream_fd(input, sizeof(input), RTR_V1); - ck_assert_int_eq(0, pdustream_next(stream, &request)); - - ck_assert_int_eq(stream->fd, request->fd); - ck_assert_str_eq(stream->addr, request->client_addr); - ck_assert_uint_eq(1, request->eos); - assert_pdu_count(1, request); - - pdu = STAILQ_FIRST(&request->pdus); - er = &pdu->obj.er; - - ck_assert_uint_eq(er->header.version, RTR_V1); - ck_assert_uint_eq(er->header.type, PDU_TYPE_ERROR_REPORT); - ck_assert_uint_eq(er->header.m.reserved, 0x1617); - ck_assert_uint_eq(er->header.length, 33); - ck_assert_uint_eq(er->errpdu_len, 12); - ck_assert_uint_eq(er->errpdu[0], 1); - ck_assert_uint_eq(er->errpdu[1], 0); - ck_assert_uint_eq(er->errpdu[2], 2); - ck_assert_uint_eq(er->errpdu[3], 3); - ck_assert_uint_eq(er->errpdu[4], 0); - ck_assert_uint_eq(er->errpdu[5], 0); - ck_assert_uint_eq(er->errpdu[6], 0); - ck_assert_uint_eq(er->errpdu[7], 12); - ck_assert_uint_eq(er->errpdu[8], 1); - ck_assert_uint_eq(er->errpdu[9], 2); - ck_assert_uint_eq(er->errpdu[10], 3); - ck_assert_uint_eq(er->errpdu[11], 4); - ck_assert_uint_eq(er->errmsg_len, 5); - ck_assert_str_eq(er->errmsg, "hello"); - - ck_assert_uint_eq(33, pdu->raw.bytes_len); - ck_assert(memcmp(input, pdu->raw.bytes, 33) == 0); - - rtreq_destroy(request); + ck_assert_uint_eq(false, pdustream_next(stream, &request)); + ck_assert_ptr_null(request); pdustream_destroy(&stream); } END_TEST -#define ASSERT_RQ(_rq, _version, _type, _reserved, _length) \ - ck_assert_uint_eq(_rq.header.version, _version); \ - ck_assert_uint_eq(_rq.header.type, _type); \ - ck_assert_uint_eq(_rq.header.m.reserved, _reserved); \ - ck_assert_uint_eq(_rq.header.length, _length); +#define ASSERT_RQ(_req, _version) \ + ck_assert_uint_eq(_req->pdu.rtr_version, _version); \ + ck_assert_uint_eq(_req->pdu.type, PDU_TYPE_RESET_QUERY); -#define ASSERT_SQ(_sq, _version, _type, _reserved, _length, _serial) \ - ck_assert_uint_eq(_sq.header.version, _version); \ - ck_assert_uint_eq(_sq.header.type, _type); \ - ck_assert_uint_eq(_sq.header.m.reserved, _reserved); \ - ck_assert_uint_eq(_sq.header.length, _length); \ - ck_assert_uint_eq(_sq.serial_number, _serial); +#define ASSERT_SQ(_req, _version, _type, _session, _serial) \ + ck_assert_uint_eq(_req->pdu.rtr_version, _version); \ + ck_assert_uint_eq(_req->pdu.type, PDU_TYPE_SERIAL_QUERY); \ + ck_assert_uint_eq(_req->pdu.obj.sq.session_id, _session); \ + ck_assert_uint_eq(_req->pdu.obj.sq.serial_number, _serial); START_TEST(test_multiple_pdus) { @@ -241,7 +182,6 @@ START_TEST(test_multiple_pdus) }; struct pdu_stream *stream; struct rtr_request *request; - struct rtr_pdu *pdu; int pipes[2]; setup_pipes(pipes); @@ -251,64 +191,36 @@ START_TEST(test_multiple_pdus) /* Input 1 */ ck_assert_int_eq(32, write(pipes[1], input1, sizeof(input1))); - ck_assert_int_eq(0, pdustream_next(stream, &request)); + ck_assert_uint_eq(true, pdustream_next(stream, &request)); ck_assert_int_eq(stream->fd, request->fd); ck_assert_str_eq(stream->addr, request->client_addr); + ASSERT_RQ(request, RTR_V1); + ck_assert_uint_eq(8, request->pdu.raw.bytes_len); + ck_assert(memcmp(input1 + 20, request->pdu.raw.bytes, 8) == 0); ck_assert_uint_eq(0, request->eos); - assert_pdu_count(3, request); - - pdu = STAILQ_FIRST(&request->pdus); - ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0, 8); - ck_assert_uint_eq(8, pdu->raw.bytes_len); - ck_assert(memcmp(input1 + 0, pdu->raw.bytes, 8) == 0); - - pdu = STAILQ_NEXT(pdu, hook); - ASSERT_SQ(pdu->obj.sq, RTR_V1, PDU_TYPE_SERIAL_QUERY, 0, 12, 0x1020304); - ck_assert_uint_eq(12, pdu->raw.bytes_len); - ck_assert(memcmp(input1 + 8, pdu->raw.bytes, 12) == 0); - - pdu = STAILQ_NEXT(pdu, hook); - ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0x304, 8); - ck_assert_uint_eq(8, pdu->raw.bytes_len); - ck_assert(memcmp(input1 + 20, pdu->raw.bytes, 8) == 0); rtreq_destroy(request); /* Input 2 */ ck_assert_int_eq(12, write(pipes[1], input2, sizeof(input2))); - ck_assert_int_eq(0, pdustream_next(stream, &request)); + ck_assert_uint_eq(true, pdustream_next(stream, &request)); ck_assert_int_eq(stream->fd, request->fd); ck_assert_str_eq(stream->addr, request->client_addr); + ASSERT_RQ(request, RTR_V1); + ck_assert_uint_eq(8, request->pdu.raw.bytes_len); + ck_assert(memcmp(input2 + 4, request->pdu.raw.bytes, 8) == 0); ck_assert_uint_eq(0, request->eos); - assert_pdu_count(2, request); - - pdu = STAILQ_FIRST(&request->pdus); - ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0x304, 8); - ck_assert_uint_eq(8, pdu->raw.bytes_len); - ck_assert(memcmp(input1 + 28, &pdu->raw.bytes[0], 4) == 0); - ck_assert(memcmp(input2 + 0, &pdu->raw.bytes[4], 4) == 0); - - pdu = STAILQ_NEXT(pdu, hook); - ASSERT_RQ(pdu->obj.rq, RTR_V1, PDU_TYPE_RESET_QUERY, 0x607, 8); - ck_assert_uint_eq(8, pdu->raw.bytes_len); - ck_assert(memcmp(input2 + 4, pdu->raw.bytes, 8) == 0); rtreq_destroy(request); /* Input 3 */ close(pipes[1]); - ck_assert_int_eq(0, pdustream_next(stream, &request)); - - ck_assert_int_eq(stream->fd, request->fd); - ck_assert_str_eq(stream->addr, request->client_addr); - ck_assert_uint_eq(1, request->eos); - assert_pdu_count(0, request); - - rtreq_destroy(request); + ck_assert_uint_eq(false, pdustream_next(stream, &request)); + ck_assert_ptr_null(request); /* Clean up */ @@ -323,14 +235,10 @@ START_TEST(test_interrupted) struct rtr_request *request; stream = create_stream_fd(input, sizeof(input), RTR_V1); - ck_assert_int_eq(0, pdustream_next(stream, &request)); - ck_assert_int_eq(stream->fd, request->fd); - ck_assert_str_eq(stream->addr, request->client_addr); - ck_assert_uint_eq(1, request->eos); - assert_pdu_count(0, request); + ck_assert_uint_eq(false, pdustream_next(stream, &request)); + ck_assert_ptr_null(request); - rtreq_destroy(request); pdustream_destroy(&stream); } END_TEST