fr_bio_t *bio;
fr_bio_fd_info_t const *fd_info;
- struct mmsghdr *mmsgvec; //!< Vector of inbound/outbound packets.
- bio_coalesced_t *coalesced; //!< Outbound coalesced requests.
-
- size_t send_buff_actual; //!< What we believe the maximum SO_SNDBUF size to be.
- ///< We don't try and encode more packet data than this
- ///< in one go.
-
rlm_radius_t const *inst; //!< Our module instance.
bio_thread_t *thread;
uint8_t id; //!< Last ID assigned to this packet.
uint8_t *packet; //!< Packet we write to the network.
size_t packet_len; //!< Length of the packet.
+ size_t partial; //!< partially sent data
radius_track_entry_t *rr; //!< ID tracking, resend count, etc.
fr_event_timer_t const *ev; //!< timer for retransmissions
int fd;
bio_handle_t *h;
bio_thread_t *thread = talloc_get_type_abort(uctx, bio_thread_t);
- uint16_t i;
MEM(h = talloc_zero(conn, bio_handle_t));
h->thread = thread;
h->max_packet_size = h->inst->max_packet_size;
h->last_idle = fr_time();
- /*
- * mmsgvec is pre-populated with pointers
- * to the iovec structs in coalesced, so we
- * just need to setup the iovec, and pass how
- * many messages we want to send to sendmmsg.
- */
- h->mmsgvec = talloc_zero_array(h, struct mmsghdr, h->inst->max_send_coalesce);
- h->coalesced = talloc_zero_array(h, bio_coalesced_t, h->inst->max_send_coalesce);
- for (i = 0; i < h->inst->max_send_coalesce; i++) {
- h->mmsgvec[i].msg_hdr.msg_iov = &h->coalesced[i].out;
- h->mmsgvec[i].msg_hdr.msg_iovlen = 1;
- }
-
MEM(h->buffer = talloc_array(h, uint8_t, h->max_packet_size));
h->buflen = h->max_packet_size;
talloc_set_destructor(h, _bio_handle_free);
- h->send_buff_actual = h->inst->fd_config.send_buff_is_set ?
- h->inst->fd_config.send_buff : h->max_packet_size * h->inst->max_send_coalesce;
-
h->fd = fd;
/*
{
bio_handle_t *h = talloc_get_type_abort(conn->h, bio_handle_t);
rlm_radius_t const *inst = h->inst;
- int sent;
- uint16_t i, queued;
- size_t total_len = 0;
+ trunk_request_t *treq;
+ bio_request_t *u;
+ request_t *request;
+ char const *action;
+ uint8_t const *packet;
+ size_t packet_len;
+ ssize_t slen;
+
+ if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return;
/*
- * Encode multiple packets in preparation
- * for transmission with sendmmsg.
+ * No more requests to send
*/
- for (i = 0, queued = 0; (i < inst->max_send_coalesce) && (total_len < h->send_buff_actual); i++) {
- trunk_request_t *treq;
- bio_request_t *u;
- request_t *request;
+ if (!treq) return;
- if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return;
+ fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) ||
+ (treq->state == TRUNK_REQUEST_STATE_PARTIAL));
- /*
- * No more requests to send
- */
- if (!treq) break;
+ request = treq->request;
+ u = talloc_get_type_abort(treq->preq, bio_request_t);
- fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) ||
- (treq->state == TRUNK_REQUEST_STATE_PARTIAL));
+ fr_assert(!u->status_check);
- request = treq->request;
- u = talloc_get_type_abort(treq->preq, bio_request_t);
-
- fr_assert(!u->status_check);
+ /*
+ * Send partial packets first.
+ */
+ if (u->partial) {
+ fr_assert(u->partial < u->packet_len);
+ packet = u->packet + u->partial;
+ packet_len = u->packet_len - u->partial;
+ goto do_write;
+ }
- /*
- * No previous packet, OR can't retransmit the
- * existing one. Oh well.
- *
- * Note that if we can't retransmit the previous
- * packet, then u->rr MUST already have been
- * deleted in the request_cancel() function
- * or request_release_conn() function when
- * the REQUEUE signal was received.
- */
- if (!u->packet) {
- fr_assert(!u->rr);
+ /*
+ * No previous packet, OR can't retransmit the
+ * existing one. Oh well.
+ *
+ * Note that if we can't retransmit the previous
+ * packet, then u->rr MUST already have been
+ * deleted in the request_cancel() function
+ * or request_release_conn() function when
+ * the REQUEUE signal was received.
+ */
+ if (!u->packet) {
+ fr_assert(!u->rr);
- if (unlikely(radius_track_entry_reserve(&u->rr, treq, h->tt, request, u->code, treq) < 0)) {
+ if (unlikely(radius_track_entry_reserve(&u->rr, treq, h->tt, request, u->code, treq) < 0)) {
#ifndef NDEBUG
- radius_track_state_log(&default_log, L_ERR, __FILE__, __LINE__,
- h->tt, bio_tracking_entry_log);
+ radius_track_state_log(&default_log, L_ERR, __FILE__, __LINE__,
+ h->tt, bio_tracking_entry_log);
#endif
- fr_assert_fail("Tracking entry allocation failed: %s", fr_strerror());
- trunk_request_signal_fail(treq);
- continue;
- }
- u->id = u->rr->id;
-
- RDEBUG("Sending %s ID %d length %zu over connection %s",
- fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
-
- if (encode(h->inst, request, u, u->id) < 0) {
- /*
- * Need to do this because request_conn_release
- * may not be called.
- */
- bio_request_reset(u);
- if (u->ev) (void) fr_event_timer_delete(&u->ev);
- trunk_request_signal_fail(treq);
- continue;
- }
- RHEXDUMP3(u->packet, u->packet_len, "Encoded packet");
+ fr_assert_fail("Tracking entry allocation failed: %s", fr_strerror());
+ trunk_request_signal_fail(treq);
+ return;
+ }
+ u->id = u->rr->id;
+
+ RDEBUG("Sending %s ID %d length %zu over connection %s",
+ fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
+ if (encode(h->inst, request, u, u->id) < 0) {
/*
- * Remember the authentication vector, which now has the
- * packet signature.
+ * Need to do this because request_conn_release
+ * may not be called.
*/
- (void) radius_track_entry_update(u->rr, u->packet + RADIUS_AUTH_VECTOR_OFFSET);
- } else {
- RDEBUG("Retransmitting %s ID %d length %zu over connection %s",
- fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
+ bio_request_reset(u);
+ if (u->ev) (void) fr_event_timer_delete(&u->ev);
+ trunk_request_signal_fail(treq);
+ return;
}
-
- log_request_pair_list(L_DBG_LVL_2, request, NULL, &request->request_pairs, NULL);
- if (!fr_pair_list_empty(&u->extra)) log_request_pair_list(L_DBG_LVL_2, request, NULL, &u->extra, NULL);
-
- /*
- * Record pointers to the buffer we'll be writing
- * We store the treq so we can place it back in
- * the pending state if the sendmmsg call fails.
- */
- h->coalesced[queued].treq = treq;
- h->coalesced[queued].out.iov_base = u->packet;
- h->coalesced[queued].out.iov_len = u->packet_len;
-
- /*
- * Record how much data we have in total.
- *
- * Try not to exceed the SO_SNDBUF value of the
- * socket as we potentially just waste CPU
- * time re-encoding the packets.
- */
- total_len += u->packet_len;
+ RHEXDUMP3(u->packet, u->packet_len, "Encoded packet");
/*
- * Tell the trunk API that this request is now in
- * the "sent" state. And we don't want to see
- * this request again. The request hasn't actually
- * been sent, but it's the only way to get at the
- * next entry in the heap.
+ * Remember the authentication vector, which now has the
+ * packet signature.
*/
- trunk_request_signal_sent(treq);
- queued++;
+ (void) radius_track_entry_update(u->rr, u->packet + RADIUS_AUTH_VECTOR_OFFSET);
+ } else {
+ RDEBUG("Retransmitting %s ID %d length %zu over connection %s",
+ fr_radius_packet_name[u->code], u->id, u->packet_len, h->fd_info->name);
}
- if (queued == 0) return; /* No work */
-
- /*
- * Verify nothing accidentally freed the connection handle
- */
- (void)talloc_get_type_abort(h, bio_handle_t);
- switch (h->fd_info->socket.af) {
- ssize_t slen;
+ log_request_pair_list(L_DBG_LVL_2, request, NULL, &request->request_pairs, NULL);
+ if (!fr_pair_list_empty(&u->extra)) log_request_pair_list(L_DBG_LVL_2, request, NULL, &u->extra, NULL);
- case AF_INET:
- case AF_INET6:
- sent = sendmmsg(h->fd, h->mmsgvec, queued, 0);
- fr_assert(0);
- break;
-
- default:
- fr_assert(inst->max_send_coalesce == 1);
+ packet = u->packet;
+ packet_len = u->packet_len;
- slen = fr_bio_write(h->bio, NULL, h->coalesced[0].out.iov_base, h->coalesced[0].out.iov_len);
- if (slen < 0) {
- sent = -1;
- } else {
- sent = 1;
- h->mmsgvec[0].msg_len = slen;
- }
- break;
- }
-
- /*
- * Send the coalesced datagrams
- */
- if (sent < 0) { /* Error means no messages were sent */
- sent = 0;
+do_write:
+ slen = fr_bio_write(h->bio, NULL, packet, packet_len);
+ if (slen < 0) {
+ /*
+ * @todo - check slen for fr_bio_error(FOO)
+ */
/*
* Temporary conditions
case EINTR: /* Interrupted by signal */
case ENOBUFS: /* No outbound packet buffers, maybe? */
case ENOMEM: /* malloc failure in kernel? */
- WARN("%s - Failed sending data over connection %s: %s",
- h->module_name, h->fd_info->name, fr_syserror(errno));
+ RWARN("%s - Failed sending data over connection %s: %s",
+ h->module_name, h->fd_info->name, fr_syserror(errno));
+ trunk_request_requeue(treq);
break;
/*
* Fatal, request specific conditions
- *
- * sendmmsg will only return an error condition if the
- * first packet being sent errors.
- *
- * When we get request specific errors, we need to fail
- * the first request in the set, and move the rest of
- * the packets back to the pending state.
*/
case EMSGSIZE: /* Packet size exceeds max size allowed on socket */
ERROR("%s - Failed sending data over connection %s: %s",
h->module_name, h->fd_info->name, fr_syserror(errno));
- trunk_request_signal_fail(h->coalesced[0].treq);
- sent = 1;
+ trunk_request_signal_fail(treq);
break;
/*
ERROR("%s - Failed sending data over connection %s: %s",
h->module_name, h->fd_info->name, fr_syserror(errno));
trunk_connection_signal_reconnect(tconn, CONNECTION_FAILED);
- return;
+ break;
}
+
+ return;
}
/*
- * For all messages that were actually sent by sendmmsg,
- * say what's up.
+ * No data to send, ignore the write for partials, but otherwise requeue it.
*/
- for (i = 0; i < sent; i++) {
- trunk_request_t *treq = h->coalesced[i].treq;
- bio_request_t *u;
- request_t *request;
- char const *action;
-
- /*
- * It's UDP so there should never be partial writes
- */
- fr_assert((size_t)h->mmsgvec[i].msg_len == h->mmsgvec[i].msg_hdr.msg_iov->iov_len);
-
- fr_assert(treq->state == TRUNK_REQUEST_STATE_SENT);
+ if (slen == 0) {
+ if (u->partial) return;
- request = treq->request;
- u = talloc_get_type_abort(treq->preq, bio_request_t);
+ RWARN("%s - Failed sending data over connection %s: sent zero bytes",
+ h->module_name, h->fd_info->name);
+ trunk_request_requeue(treq);
+ }
- /*
- * Don't print anything more for replicated requests.
- */
- if (inst->replicate) {
- bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t);
+ packet_len += slen;
+ if (packet_len < u->packet_len) {
+ u->partial = packet_len;
+ trunk_request_signal_partial(treq);
+ return;
+ }
- r->rcode = RLM_MODULE_OK;
- trunk_request_signal_complete(treq);
- }
+ /*
+ * Don't print anything more for replicated requests.
+ */
+ if (inst->replicate) {
+ bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t);
- /*
- * Tell the admin what's going on
- */
- if (u->retry.count == 1) {
- action = inst->originate ? "Originated" : "Proxied";
- h->last_sent = u->retry.start;
- if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent;
+ r->rcode = RLM_MODULE_OK;
+ trunk_request_signal_complete(treq);
+ } else {
+ trunk_request_signal_sent(treq);
+ }
- } else {
- action = "Retransmitted";
- }
+ /*
+ * Tell the admin what's going on
+ */
+ if (u->retry.count == 1) {
+ action = inst->originate ? "Originated" : "Proxied";
+ h->last_sent = u->retry.start;
+ if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent;
- fr_assert(!u->status_check);
+ } else {
+ action = "Retransmitted";
+ }
+ fr_assert(!u->status_check);
- if (!inst->synchronous) {
- RDEBUG("%s request. Expecting response within %pVs", action,
- fr_box_time_delta(u->retry.rt));
+ if (!inst->synchronous) {
+ RDEBUG("%s request. Expecting response within %pVs", action,
+ fr_box_time_delta(u->retry.rt));
- } else {
- /*
- * If the packet doesn't get a response,
- * then bio_request_free() will notice, and run conn_zombie()
- */
- RDEBUG("%s request. Relying on NAS to perform more retransmissions", action);
- }
+ } else {
+ /*
+ * If the packet doesn't get a response,
+ * then bio_request_free() will notice, and run conn_zombie()
+ */
+ RDEBUG("%s request. Relying on NAS to perform more retransmissions", action);
}
-
- /*
- * Requests that weren't sent get re-enqueued
- *
- * The cancel logic runs as per-normal and cleans up
- * the request ready for sending again...
- */
- for (i = sent; i < queued; i++) trunk_request_requeue(h->coalesced[i].treq);
}
/** Deal with Protocol-Error replies, and possible negotiation