]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
send one packet at a time
authorAlan T. DeKok <aland@freeradius.org>
Sun, 8 Dec 2024 12:48:57 +0000 (07:48 -0500)
committerAlan T. DeKok <aland@freeradius.org>
Sun, 8 Dec 2024 12:48:57 +0000 (07:48 -0500)
when we push queuing to the BIO layer, we can add a queue API
which the radius module can call:

* start queue entries X
* write
* commit queue

src/modules/rlm_radius2/bio.c

index deef2bbdfdbf74c924b9ccecd96ad43012c5a9e7..5525d30f690424b7d36003721591138e2e088019 100644 (file)
@@ -74,13 +74,6 @@ typedef struct {
        fr_bio_t                *bio;
        fr_bio_fd_info_t const  *fd_info;
 
-       struct mmsghdr          *mmsgvec;               //!< Vector of inbound/outbound packets.
-       bio_coalesced_t         *coalesced;             //!< Outbound coalesced requests.
-
-       size_t                  send_buff_actual;       //!< What we believe the maximum SO_SNDBUF size to be.
-                                                       ///< We don't try and encode more packet data than this
-                                                       ///< in one go.
-
        rlm_radius_t const      *inst;                  //!< Our module instance.
        bio_thread_t            *thread;
 
@@ -128,6 +121,7 @@ struct bio_request_s {
        uint8_t                 id;                     //!< Last ID assigned to this packet.
        uint8_t                 *packet;                //!< Packet we write to the network.
        size_t                  packet_len;             //!< Length of the packet.
+       size_t                  partial;                //!< partially sent data
 
        radius_track_entry_t    *rr;                    //!< ID tracking, resend count, etc.
        fr_event_timer_t const  *ev;                    //!< timer for retransmissions
@@ -632,7 +626,6 @@ static connection_state_t conn_init(void **h_out, connection_t *conn, void *uctx
        int                     fd;
        bio_handle_t            *h;
        bio_thread_t            *thread = talloc_get_type_abort(uctx, bio_thread_t);
-       uint16_t                i;
 
        MEM(h = talloc_zero(conn, bio_handle_t));
        h->thread = thread;
@@ -641,19 +634,6 @@ static connection_state_t conn_init(void **h_out, connection_t *conn, void *uctx
        h->max_packet_size = h->inst->max_packet_size;
        h->last_idle = fr_time();
 
-       /*
-        *      mmsgvec is pre-populated with pointers
-        *      to the iovec structs in coalesced, so we
-        *      just need to setup the iovec, and pass how
-        *      many messages we want to send to sendmmsg.
-        */
-       h->mmsgvec = talloc_zero_array(h, struct mmsghdr, h->inst->max_send_coalesce);
-       h->coalesced = talloc_zero_array(h, bio_coalesced_t, h->inst->max_send_coalesce);
-       for (i = 0; i < h->inst->max_send_coalesce; i++) {
-               h->mmsgvec[i].msg_hdr.msg_iov = &h->coalesced[i].out;
-               h->mmsgvec[i].msg_hdr.msg_iovlen = 1;
-       }
-
        MEM(h->buffer = talloc_array(h, uint8_t, h->max_packet_size));
        h->buflen = h->max_packet_size;
 
@@ -674,9 +654,6 @@ static connection_state_t conn_init(void **h_out, connection_t *conn, void *uctx
 
        talloc_set_destructor(h, _bio_handle_free);
 
-       h->send_buff_actual = h->inst->fd_config.send_buff_is_set ?
-               h->inst->fd_config.send_buff : h->max_packet_size * h->inst->max_send_coalesce;
-
        h->fd = fd;
 
        /*
@@ -1356,148 +1333,100 @@ static void request_mux(UNUSED fr_event_list_t *el,
 {
        bio_handle_t            *h = talloc_get_type_abort(conn->h, bio_handle_t);
        rlm_radius_t const      *inst = h->inst;
-       int                     sent;
-       uint16_t                i, queued;
-       size_t                  total_len = 0;
+       trunk_request_t         *treq;
+       bio_request_t           *u;
+       request_t               *request;
+       char const              *action;
+       uint8_t const           *packet;
+       size_t                  packet_len;
+       ssize_t                 slen;
+
+       if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return;
 
        /*
-        *      Encode multiple packets in preparation
-        *      for transmission with sendmmsg.
+        *      No more requests to send
         */
-       for (i = 0, queued = 0; (i < inst->max_send_coalesce) && (total_len < h->send_buff_actual); i++) {
-               trunk_request_t         *treq;
-               bio_request_t           *u;
-               request_t               *request;
+       if (!treq) return;
 
-               if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return;
+       fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) ||
+                 (treq->state == TRUNK_REQUEST_STATE_PARTIAL));
 
-               /*
-                *      No more requests to send
-                */
-               if (!treq) break;
+       request = treq->request;
+       u = talloc_get_type_abort(treq->preq, bio_request_t);
 
-               fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) ||
-                          (treq->state == TRUNK_REQUEST_STATE_PARTIAL));
+       fr_assert(!u->status_check);
 
-               request = treq->request;
-               u = talloc_get_type_abort(treq->preq, bio_request_t);
-
-               fr_assert(!u->status_check);
+       /*
+        *      Send partial packets first.
+        */
+       if (u->partial) {
+               fr_assert(u->partial < u->packet_len);
+               packet = u->packet + u->partial;
+               packet_len = u->packet_len - u->partial;
+               goto do_write;
+       }
 
-               /*
-                *      No previous packet, OR can't retransmit the
-                *      existing one.  Oh well.
-                *
-                *      Note that if we can't retransmit the previous
-                *      packet, then u->rr MUST already have been
-                *      deleted in the request_cancel() function
-                *      or request_release_conn() function when
-                *      the REQUEUE signal was received.
-                */
-               if (!u->packet) {
-                       fr_assert(!u->rr);
+       /*
+        *      No previous packet, OR can't retransmit the
+        *      existing one.  Oh well.
+        *
+        *      Note that if we can't retransmit the previous
+        *      packet, then u->rr MUST already have been
+        *      deleted in the request_cancel() function
+        *      or request_release_conn() function when
+        *      the REQUEUE signal was received.
+        */
+       if (!u->packet) {
+               fr_assert(!u->rr);
 
-                       if (unlikely(radius_track_entry_reserve(&u->rr, treq, h->tt, request, u->code, treq) < 0)) {
+               if (unlikely(radius_track_entry_reserve(&u->rr, treq, h->tt, request, u->code, treq) < 0)) {
 #ifndef NDEBUG
-                               radius_track_state_log(&default_log, L_ERR, __FILE__, __LINE__,
-                                                      h->tt, bio_tracking_entry_log);
+                       radius_track_state_log(&default_log, L_ERR, __FILE__, __LINE__,
+                                              h->tt, bio_tracking_entry_log);
 #endif
-                               fr_assert_fail("Tracking entry allocation failed: %s", fr_strerror());
-                               trunk_request_signal_fail(treq);
-                               continue;
-                       }
-                       u->id = u->rr->id;
-
-                       RDEBUG("Sending %s ID %d length %zu over connection %s",
-                              fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
-
-                       if (encode(h->inst, request, u, u->id) < 0) {
-                               /*
-                                *      Need to do this because request_conn_release
-                                *      may not be called.
-                                */
-                               bio_request_reset(u);
-                               if (u->ev) (void) fr_event_timer_delete(&u->ev);
-                               trunk_request_signal_fail(treq);
-                               continue;
-                       }
-                       RHEXDUMP3(u->packet, u->packet_len, "Encoded packet");
+                       fr_assert_fail("Tracking entry allocation failed: %s", fr_strerror());
+                       trunk_request_signal_fail(treq);
+                       return;
+               }
+               u->id = u->rr->id;
+
+               RDEBUG("Sending %s ID %d length %zu over connection %s",
+                      fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
 
+               if (encode(h->inst, request, u, u->id) < 0) {
                        /*
-                        *      Remember the authentication vector, which now has the
-                        *      packet signature.
+                        *      Need to do this because request_conn_release
+                        *      may not be called.
                         */
-                       (void) radius_track_entry_update(u->rr, u->packet + RADIUS_AUTH_VECTOR_OFFSET);
-               } else {
-                       RDEBUG("Retransmitting %s ID %d length %zu over connection %s",
-                              fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
+                       bio_request_reset(u);
+                       if (u->ev) (void) fr_event_timer_delete(&u->ev);
+                       trunk_request_signal_fail(treq);
+                       return;
                }
-
-               log_request_pair_list(L_DBG_LVL_2, request, NULL, &request->request_pairs, NULL);
-               if (!fr_pair_list_empty(&u->extra)) log_request_pair_list(L_DBG_LVL_2, request, NULL, &u->extra, NULL);
-
-               /*
-                *      Record pointers to the buffer we'll be writing
-                *      We store the treq so we can place it back in
-                *      the pending state if the sendmmsg call fails.
-                */
-               h->coalesced[queued].treq = treq;
-               h->coalesced[queued].out.iov_base = u->packet;
-               h->coalesced[queued].out.iov_len = u->packet_len;
-
-               /*
-                *      Record how much data we have in total.
-                *
-                *      Try not to exceed the SO_SNDBUF value of the
-                *      socket as we potentially just waste CPU
-                *      time re-encoding the packets.
-                */
-               total_len += u->packet_len;
+               RHEXDUMP3(u->packet, u->packet_len, "Encoded packet");
 
                /*
-                *      Tell the trunk API that this request is now in
-                *      the "sent" state.  And we don't want to see
-                *      this request again. The request hasn't actually
-                *      been sent, but it's the only way to get at the
-                *      next entry in the heap.
+                *      Remember the authentication vector, which now has the
+                *      packet signature.
                 */
-               trunk_request_signal_sent(treq);
-               queued++;
+               (void) radius_track_entry_update(u->rr, u->packet + RADIUS_AUTH_VECTOR_OFFSET);
+       } else {
+               RDEBUG("Retransmitting %s ID %d length %zu over connection %s",
+                      fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
        }
-       if (queued == 0) return;        /* No work */
-
-       /*
-        *      Verify nothing accidentally freed the connection handle
-        */
-       (void)talloc_get_type_abort(h, bio_handle_t);
 
-       switch (h->fd_info->socket.af) {
-               ssize_t slen;
+       log_request_pair_list(L_DBG_LVL_2, request, NULL, &request->request_pairs, NULL);
+       if (!fr_pair_list_empty(&u->extra)) log_request_pair_list(L_DBG_LVL_2, request, NULL, &u->extra, NULL);
 
-       case AF_INET:
-       case AF_INET6:
-               sent = sendmmsg(h->fd, h->mmsgvec, queued, 0);
-               fr_assert(0);
-               break;
-
-       default:
-               fr_assert(inst->max_send_coalesce == 1);
+       packet = u->packet;
+       packet_len = u->packet_len;
 
-               slen = fr_bio_write(h->bio, NULL, h->coalesced[0].out.iov_base, h->coalesced[0].out.iov_len);
-               if (slen < 0) {
-                       sent = -1;
-               } else {
-                       sent = 1;
-                       h->mmsgvec[0].msg_len = slen;
-               }
-               break;
-       }
-
-       /*
-        *      Send the coalesced datagrams
-        */
-       if (sent < 0) {         /* Error means no messages were sent */
-               sent = 0;
+do_write:
+       slen = fr_bio_write(h->bio, NULL, packet, packet_len);
+       if (slen < 0) {
+               /*
+                *      @todo - check slen for fr_bio_error(FOO)
+                */
 
                /*
                 *      Temporary conditions
@@ -1510,25 +1439,18 @@ static void request_mux(UNUSED fr_event_list_t *el,
                case EINTR:             /* Interrupted by signal */
                case ENOBUFS:           /* No outbound packet buffers, maybe? */
                case ENOMEM:            /* malloc failure in kernel? */
-                       WARN("%s - Failed sending data over connection %s: %s",
-                            h->module_name, h->fd_info->name, fr_syserror(errno));
+                       RWARN("%s - Failed sending data over connection %s: %s",
+                             h->module_name, h->fd_info->name, fr_syserror(errno));
+                       trunk_request_requeue(treq);
                        break;
 
                /*
                 *      Fatal, request specific conditions
-                *
-                *      sendmmsg will only return an error condition if the
-                *      first packet being sent errors.
-                *
-                *      When we get request specific errors, we need to fail
-                *      the first request in the set, and move the rest of
-                *      the packets back to the pending state.
                 */
                case EMSGSIZE:          /* Packet size exceeds max size allowed on socket */
                        ERROR("%s - Failed sending data over connection %s: %s",
                              h->module_name, h->fd_info->name, fr_syserror(errno));
-                       trunk_request_signal_fail(h->coalesced[0].treq);
-                       sent = 1;
+                       trunk_request_signal_fail(treq);
                        break;
 
                /*
@@ -1539,75 +1461,67 @@ static void request_mux(UNUSED fr_event_list_t *el,
                        ERROR("%s - Failed sending data over connection %s: %s",
                              h->module_name, h->fd_info->name, fr_syserror(errno));
                        trunk_connection_signal_reconnect(tconn, CONNECTION_FAILED);
-                       return;
+                       break;
                }
+
+               return;
        }
 
        /*
-        *      For all messages that were actually sent by sendmmsg,
-        *      say what's up.
+        *      No data to send, ignore the write for partials, but otherwise requeue it.
         */
-       for (i = 0; i < sent; i++) {
-               trunk_request_t         *treq = h->coalesced[i].treq;
-               bio_request_t           *u;
-               request_t               *request;
-               char const              *action;
-
-               /*
-                *      It's UDP so there should never be partial writes
-                */
-               fr_assert((size_t)h->mmsgvec[i].msg_len == h->mmsgvec[i].msg_hdr.msg_iov->iov_len);
-
-               fr_assert(treq->state == TRUNK_REQUEST_STATE_SENT);
+       if (slen == 0) {
+               if (u->partial) return;
 
-               request = treq->request;
-               u = talloc_get_type_abort(treq->preq, bio_request_t);
+               RWARN("%s - Failed sending data over connection %s: sent zero bytes",
+                     h->module_name, h->fd_info->name);
+               trunk_request_requeue(treq);
+       }
 
-               /*
-                *      Don't print anything more for replicated requests.
-                */
-               if (inst->replicate) {
-                       bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t);
+       packet_len += slen;
+       if (packet_len < u->packet_len) {
+               u->partial = packet_len;
+               trunk_request_signal_partial(treq);
+               return;
+       }
 
-                       r->rcode = RLM_MODULE_OK;
-                       trunk_request_signal_complete(treq);
-               }
+       /*
+        *      Don't print anything more for replicated requests.
+        */
+       if (inst->replicate) {
+               bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t);
 
-               /*
-                *      Tell the admin what's going on
-                */
-               if (u->retry.count == 1) {
-                       action = inst->originate ? "Originated" : "Proxied";
-                       h->last_sent = u->retry.start;
-                       if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent;
+               r->rcode = RLM_MODULE_OK;
+               trunk_request_signal_complete(treq);
+       } else {
+               trunk_request_signal_sent(treq);
+       }
 
-               } else {
-                       action = "Retransmitted";
-               }
+       /*
+        *      Tell the admin what's going on
+        */
+       if (u->retry.count == 1) {
+               action = inst->originate ? "Originated" : "Proxied";
+               h->last_sent = u->retry.start;
+               if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent;
 
-               fr_assert(!u->status_check);
+       } else {
+               action = "Retransmitted";
+       }
 
+       fr_assert(!u->status_check);
 
-               if (!inst->synchronous) {
-                       RDEBUG("%s request.  Expecting response within %pVs", action,
-                              fr_box_time_delta(u->retry.rt));
+       if (!inst->synchronous) {
+               RDEBUG("%s request.  Expecting response within %pVs", action,
+                      fr_box_time_delta(u->retry.rt));
 
-               } else {
-                       /*
-                        *      If the packet doesn't get a response,
-                        *      then bio_request_free() will notice, and run conn_zombie()
-                        */
-                       RDEBUG("%s request.  Relying on NAS to perform more retransmissions", action);
-               }
+       } else {
+               /*
+                *      If the packet doesn't get a response,
+                *      then bio_request_free() will notice, and run conn_zombie()
+                */
+               RDEBUG("%s request.  Relying on NAS to perform more retransmissions", action);
        }
-
-       /*
-        *      Requests that weren't sent get re-enqueued
-        *
-        *      The cancel logic runs as per-normal and cleans up
-        *      the request ready for sending again...
-        */
-       for (i = sent; i < queued; i++) trunk_request_requeue(h->coalesced[i].treq);
 }
 
 /** Deal with Protocol-Error replies, and possible negotiation