From: Alan T. DeKok Date: Sun, 8 Dec 2024 12:48:57 +0000 (-0500) Subject: send one packet at a time X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f98498af7418ed8dcd65b11da7b8779e3c379dc0;p=thirdparty%2Ffreeradius-server.git send one packet at a time when we push queuing to the BIO layer, we can add a queue API which the radius module can call: * start queue entries X * write * commit queue --- diff --git a/src/modules/rlm_radius2/bio.c b/src/modules/rlm_radius2/bio.c index deef2bbdfdb..5525d30f690 100644 --- a/src/modules/rlm_radius2/bio.c +++ b/src/modules/rlm_radius2/bio.c @@ -74,13 +74,6 @@ typedef struct { fr_bio_t *bio; fr_bio_fd_info_t const *fd_info; - struct mmsghdr *mmsgvec; //!< Vector of inbound/outbound packets. - bio_coalesced_t *coalesced; //!< Outbound coalesced requests. - - size_t send_buff_actual; //!< What we believe the maximum SO_SNDBUF size to be. - ///< We don't try and encode more packet data than this - ///< in one go. - rlm_radius_t const *inst; //!< Our module instance. bio_thread_t *thread; @@ -128,6 +121,7 @@ struct bio_request_s { uint8_t id; //!< Last ID assigned to this packet. uint8_t *packet; //!< Packet we write to the network. size_t packet_len; //!< Length of the packet. + size_t partial; //!< partially sent data radius_track_entry_t *rr; //!< ID tracking, resend count, etc. fr_event_timer_t const *ev; //!< timer for retransmissions @@ -632,7 +626,6 @@ static connection_state_t conn_init(void **h_out, connection_t *conn, void *uctx int fd; bio_handle_t *h; bio_thread_t *thread = talloc_get_type_abort(uctx, bio_thread_t); - uint16_t i; MEM(h = talloc_zero(conn, bio_handle_t)); h->thread = thread; @@ -641,19 +634,6 @@ static connection_state_t conn_init(void **h_out, connection_t *conn, void *uctx h->max_packet_size = h->inst->max_packet_size; h->last_idle = fr_time(); - /* - * mmsgvec is pre-populated with pointers - * to the iovec structs in coalesced, so we - * just need to setup the iovec, and pass how - * many messages we want to send to sendmmsg. - */ - h->mmsgvec = talloc_zero_array(h, struct mmsghdr, h->inst->max_send_coalesce); - h->coalesced = talloc_zero_array(h, bio_coalesced_t, h->inst->max_send_coalesce); - for (i = 0; i < h->inst->max_send_coalesce; i++) { - h->mmsgvec[i].msg_hdr.msg_iov = &h->coalesced[i].out; - h->mmsgvec[i].msg_hdr.msg_iovlen = 1; - } - MEM(h->buffer = talloc_array(h, uint8_t, h->max_packet_size)); h->buflen = h->max_packet_size; @@ -674,9 +654,6 @@ static connection_state_t conn_init(void **h_out, connection_t *conn, void *uctx talloc_set_destructor(h, _bio_handle_free); - h->send_buff_actual = h->inst->fd_config.send_buff_is_set ? - h->inst->fd_config.send_buff : h->max_packet_size * h->inst->max_send_coalesce; - h->fd = fd; /* @@ -1356,148 +1333,100 @@ static void request_mux(UNUSED fr_event_list_t *el, { bio_handle_t *h = talloc_get_type_abort(conn->h, bio_handle_t); rlm_radius_t const *inst = h->inst; - int sent; - uint16_t i, queued; - size_t total_len = 0; + trunk_request_t *treq; + bio_request_t *u; + request_t *request; + char const *action; + uint8_t const *packet; + size_t packet_len; + ssize_t slen; + + if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return; /* - * Encode multiple packets in preparation - * for transmission with sendmmsg. + * No more requests to send */ - for (i = 0, queued = 0; (i < inst->max_send_coalesce) && (total_len < h->send_buff_actual); i++) { - trunk_request_t *treq; - bio_request_t *u; - request_t *request; + if (!treq) return; - if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return; + fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) || + (treq->state == TRUNK_REQUEST_STATE_PARTIAL)); - /* - * No more requests to send - */ - if (!treq) break; + request = treq->request; + u = talloc_get_type_abort(treq->preq, bio_request_t); - fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) || - (treq->state == TRUNK_REQUEST_STATE_PARTIAL)); + fr_assert(!u->status_check); - request = treq->request; - u = talloc_get_type_abort(treq->preq, bio_request_t); - - fr_assert(!u->status_check); + /* + * Send partial packets first. + */ + if (u->partial) { + fr_assert(u->partial < u->packet_len); + packet = u->packet + u->partial; + packet_len = u->packet_len - u->partial; + goto do_write; + } - /* - * No previous packet, OR can't retransmit the - * existing one. Oh well. - * - * Note that if we can't retransmit the previous - * packet, then u->rr MUST already have been - * deleted in the request_cancel() function - * or request_release_conn() function when - * the REQUEUE signal was received. - */ - if (!u->packet) { - fr_assert(!u->rr); + /* + * No previous packet, OR can't retransmit the + * existing one. Oh well. + * + * Note that if we can't retransmit the previous + * packet, then u->rr MUST already have been + * deleted in the request_cancel() function + * or request_release_conn() function when + * the REQUEUE signal was received. + */ + if (!u->packet) { + fr_assert(!u->rr); - if (unlikely(radius_track_entry_reserve(&u->rr, treq, h->tt, request, u->code, treq) < 0)) { + if (unlikely(radius_track_entry_reserve(&u->rr, treq, h->tt, request, u->code, treq) < 0)) { #ifndef NDEBUG - radius_track_state_log(&default_log, L_ERR, __FILE__, __LINE__, - h->tt, bio_tracking_entry_log); + radius_track_state_log(&default_log, L_ERR, __FILE__, __LINE__, + h->tt, bio_tracking_entry_log); #endif - fr_assert_fail("Tracking entry allocation failed: %s", fr_strerror()); - trunk_request_signal_fail(treq); - continue; - } - u->id = u->rr->id; - - RDEBUG("Sending %s ID %d length %zu over connection %s", - fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name); - - if (encode(h->inst, request, u, u->id) < 0) { - /* - * Need to do this because request_conn_release - * may not be called. - */ - bio_request_reset(u); - if (u->ev) (void) fr_event_timer_delete(&u->ev); - trunk_request_signal_fail(treq); - continue; - } - RHEXDUMP3(u->packet, u->packet_len, "Encoded packet"); + fr_assert_fail("Tracking entry allocation failed: %s", fr_strerror()); + trunk_request_signal_fail(treq); + return; + } + u->id = u->rr->id; + + RDEBUG("Sending %s ID %d length %zu over connection %s", + fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name); + if (encode(h->inst, request, u, u->id) < 0) { /* - * Remember the authentication vector, which now has the - * packet signature. + * Need to do this because request_conn_release + * may not be called. */ - (void) radius_track_entry_update(u->rr, u->packet + RADIUS_AUTH_VECTOR_OFFSET); - } else { - RDEBUG("Retransmitting %s ID %d length %zu over connection %s", - fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name); + bio_request_reset(u); + if (u->ev) (void) fr_event_timer_delete(&u->ev); + trunk_request_signal_fail(treq); + return; } - - log_request_pair_list(L_DBG_LVL_2, request, NULL, &request->request_pairs, NULL); - if (!fr_pair_list_empty(&u->extra)) log_request_pair_list(L_DBG_LVL_2, request, NULL, &u->extra, NULL); - - /* - * Record pointers to the buffer we'll be writing - * We store the treq so we can place it back in - * the pending state if the sendmmsg call fails. - */ - h->coalesced[queued].treq = treq; - h->coalesced[queued].out.iov_base = u->packet; - h->coalesced[queued].out.iov_len = u->packet_len; - - /* - * Record how much data we have in total. - * - * Try not to exceed the SO_SNDBUF value of the - * socket as we potentially just waste CPU - * time re-encoding the packets. - */ - total_len += u->packet_len; + RHEXDUMP3(u->packet, u->packet_len, "Encoded packet"); /* - * Tell the trunk API that this request is now in - * the "sent" state. And we don't want to see - * this request again. The request hasn't actually - * been sent, but it's the only way to get at the - * next entry in the heap. + * Remember the authentication vector, which now has the + * packet signature. */ - trunk_request_signal_sent(treq); - queued++; + (void) radius_track_entry_update(u->rr, u->packet + RADIUS_AUTH_VECTOR_OFFSET); + } else { + RDEBUG("Retransmitting %s ID %d length %zu over connection %s", + fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name); } - if (queued == 0) return; /* No work */ - - /* - * Verify nothing accidentally freed the connection handle - */ - (void)talloc_get_type_abort(h, bio_handle_t); - switch (h->fd_info->socket.af) { - ssize_t slen; + log_request_pair_list(L_DBG_LVL_2, request, NULL, &request->request_pairs, NULL); + if (!fr_pair_list_empty(&u->extra)) log_request_pair_list(L_DBG_LVL_2, request, NULL, &u->extra, NULL); - case AF_INET: - case AF_INET6: - sent = sendmmsg(h->fd, h->mmsgvec, queued, 0); - fr_assert(0); - break; - - default: - fr_assert(inst->max_send_coalesce == 1); + packet = u->packet; + packet_len = u->packet_len; - slen = fr_bio_write(h->bio, NULL, h->coalesced[0].out.iov_base, h->coalesced[0].out.iov_len); - if (slen < 0) { - sent = -1; - } else { - sent = 1; - h->mmsgvec[0].msg_len = slen; - } - break; - } - - /* - * Send the coalesced datagrams - */ - if (sent < 0) { /* Error means no messages were sent */ - sent = 0; +do_write: + slen = fr_bio_write(h->bio, NULL, packet, packet_len); + if (slen < 0) { + /* + * @todo - check slen for fr_bio_error(FOO) + */ /* * Temporary conditions @@ -1510,25 +1439,18 @@ static void request_mux(UNUSED fr_event_list_t *el, case EINTR: /* Interrupted by signal */ case ENOBUFS: /* No outbound packet buffers, maybe? */ case ENOMEM: /* malloc failure in kernel? */ - WARN("%s - Failed sending data over connection %s: %s", - h->module_name, h->fd_info->name, fr_syserror(errno)); + RWARN("%s - Failed sending data over connection %s: %s", + h->module_name, h->fd_info->name, fr_syserror(errno)); + trunk_request_requeue(treq); break; /* * Fatal, request specific conditions - * - * sendmmsg will only return an error condition if the - * first packet being sent errors. - * - * When we get request specific errors, we need to fail - * the first request in the set, and move the rest of - * the packets back to the pending state. */ case EMSGSIZE: /* Packet size exceeds max size allowed on socket */ ERROR("%s - Failed sending data over connection %s: %s", h->module_name, h->fd_info->name, fr_syserror(errno)); - trunk_request_signal_fail(h->coalesced[0].treq); - sent = 1; + trunk_request_signal_fail(treq); break; /* @@ -1539,75 +1461,67 @@ static void request_mux(UNUSED fr_event_list_t *el, ERROR("%s - Failed sending data over connection %s: %s", h->module_name, h->fd_info->name, fr_syserror(errno)); trunk_connection_signal_reconnect(tconn, CONNECTION_FAILED); - return; + break; } + + return; } /* - * For all messages that were actually sent by sendmmsg, - * say what's up. + * No data to send, ignore the write for partials, but otherwise requeue it. */ - for (i = 0; i < sent; i++) { - trunk_request_t *treq = h->coalesced[i].treq; - bio_request_t *u; - request_t *request; - char const *action; - - /* - * It's UDP so there should never be partial writes - */ - fr_assert((size_t)h->mmsgvec[i].msg_len == h->mmsgvec[i].msg_hdr.msg_iov->iov_len); - - fr_assert(treq->state == TRUNK_REQUEST_STATE_SENT); + if (slen == 0) { + if (u->partial) return; - request = treq->request; - u = talloc_get_type_abort(treq->preq, bio_request_t); + RWARN("%s - Failed sending data over connection %s: sent zero bytes", + h->module_name, h->fd_info->name); + trunk_request_requeue(treq); + } - /* - * Don't print anything more for replicated requests. - */ - if (inst->replicate) { - bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t); + packet_len += slen; + if (packet_len < u->packet_len) { + u->partial = packet_len; + trunk_request_signal_partial(treq); + return; + } - r->rcode = RLM_MODULE_OK; - trunk_request_signal_complete(treq); - } + /* + * Don't print anything more for replicated requests. + */ + if (inst->replicate) { + bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t); - /* - * Tell the admin what's going on - */ - if (u->retry.count == 1) { - action = inst->originate ? "Originated" : "Proxied"; - h->last_sent = u->retry.start; - if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent; + r->rcode = RLM_MODULE_OK; + trunk_request_signal_complete(treq); + } else { + trunk_request_signal_sent(treq); + } - } else { - action = "Retransmitted"; - } + /* + * Tell the admin what's going on + */ + if (u->retry.count == 1) { + action = inst->originate ? "Originated" : "Proxied"; + h->last_sent = u->retry.start; + if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent; - fr_assert(!u->status_check); + } else { + action = "Retransmitted"; + } + fr_assert(!u->status_check); - if (!inst->synchronous) { - RDEBUG("%s request. Expecting response within %pVs", action, - fr_box_time_delta(u->retry.rt)); + if (!inst->synchronous) { + RDEBUG("%s request. Expecting response within %pVs", action, + fr_box_time_delta(u->retry.rt)); - } else { - /* - * If the packet doesn't get a response, - * then bio_request_free() will notice, and run conn_zombie() - */ - RDEBUG("%s request. Relying on NAS to perform more retransmissions", action); - } + } else { + /* + * If the packet doesn't get a response, + * then bio_request_free() will notice, and run conn_zombie() + */ + RDEBUG("%s request. Relying on NAS to perform more retransmissions", action); } - - /* - * Requests that weren't sent get re-enqueued - * - * The cancel logic runs as per-normal and cleans up - * the request ready for sending again... - */ - for (i = sent; i < queued; i++) trunk_request_requeue(h->coalesced[i].treq); } /** Deal with Protocol-Error replies, and possible negotiation