}
}
+/*
+ * This is the same as request_mux(), except that we immediately mark the request as complete.
+ */
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function*/
+static void request_replicate_mux(UNUSED fr_event_list_t *el,
+ trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
+{
+ bio_handle_t *h = talloc_get_type_abort(conn->h, bio_handle_t);
+ trunk_request_t *treq;
+ request_t *request;
+ bio_request_t *u;
+
+ if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return;
+
+ /*
+ * No more requests to send
+ */
+ if (!treq) return;
+
+ request = treq->request;
+
+ mod_write(request, treq, h);
+
+ /*
+ * We don't care about the reply, so the request is immediately finished.
+ */
+ u = treq->preq;
+ u->rcode = RLM_MODULE_OK;
+ trunk_request_signal_complete(treq);
+}
+
+CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function*/
+static void request_replicate_demux(UNUSED fr_event_list_t *el, trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
+{
+ bio_handle_t *h = talloc_get_type_abort(conn->h, bio_handle_t);
+
+ DEBUG3("%s - Reading data for connection %s", h->ctx.module_name, h->ctx.fd_info->name);
+
+ while (true) {
+ ssize_t slen;
+
+ trunk_request_t *treq;
+ request_t *request;
+ bio_request_t *u;
+ radius_track_entry_t *rr;
+ fr_radius_decode_fail_t reason;
+ uint8_t code = 0;
+ fr_pair_list_t reply;
+
+ fr_time_t now;
+
+ fr_pair_list_init(&reply);
+
+ /*
+ * Drain the socket of all packets. If we're busy, this
+ * saves a round through the event loop. If we're not
+ * busy, a few extra system calls don't matter.
+ */
+ slen = fr_bio_read(h->bio.read, NULL, h->buffer, h->buflen);
+ if (slen == 0) {
+ /*
+ * @todo - set BIO FD EOF callback, so that we don't have to check it here.
+ */
+ if (h->ctx.fd_info->eof) trunk_connection_signal_reconnect(tconn, CONNECTION_FAILED);
+ return;
+ }
+
+ /*
+ * We're done reading, return.
+ */
+ if (slen == fr_bio_error(IO_WOULD_BLOCK)) return;
+
+ if (slen < 0) {
+ ERROR("%s - Failed reading response from socket: %s",
+ h->ctx.module_name, fr_syserror(errno));
+ trunk_connection_signal_reconnect(tconn, CONNECTION_FAILED);
+ return;
+ }
+
+ fr_assert(slen >= RADIUS_HEADER_LENGTH); /* checked in verify */
+
+ /*
+ * We only pay attention to Protocol-Error replies.
+ *
+ * All other packets are discarded.
+ */
+ if (h->buffer[0] != FR_RADIUS_CODE_PROTOCOL_ERROR) {
+ continue;
+ }
+
+ /*
+ * Note that we don't care about packet codes. All
+ * packet codes share the same ID space.
+ */
+ rr = radius_track_entry_find(h->tt, h->buffer[1], NULL);
+ if (!rr) {
+ WARN("%s - Ignoring reply with ID %i that arrived too late",
+ h->ctx.module_name, h->buffer[1]);
+ continue;
+ }
+
+ treq = talloc_get_type_abort(rr->uctx, trunk_request_t);
+ request = treq->request;
+ fr_assert(request != NULL);
+ u = talloc_get_type_abort(treq->rctx, bio_request_t);
+ fr_assert(u == treq->preq);
+
+ /*
+ * Decode the incoming packet
+ */
+ reason = decode(request->reply_ctx, &reply, &code, h, request, u, rr->vector, h->buffer, (size_t)slen);
+ if (reason != DECODE_FAIL_NONE) continue;
+
+ /*
+ * Only valid packets are processed
+ * Otherwise an attacker could perform
+ * a DoS attack against the proxying servers
+ * by sending fake responses for upstream
+ * servers.
+ */
+ h->last_reply = now = fr_time();
+
+ /*
+ * Status-Server can have any reply code, we don't care
+ * what it is. So long as it's signed properly, we
+ * accept it. This flexibility is because we don't
+ * expose Status-Server to the admins. It's only used by
+ * this module for internal signalling.
+ */
+ if (u == h->status_u) {
+ fr_pair_list_free(&reply); /* Probably want to pass this to status_check_reply? */
+ status_check_reply(treq, now);
+ trunk_request_signal_complete(treq);
+ continue;
+ }
+
+ /*
+ * Handle any state changes, etc. needed by receiving a
+ * Protocol-Error reply packet.
+ *
+ * Protocol-Error is also permitted as a reply to any
+ * packet.
+ */
+ protocol_error_reply(u, h);
+ }
+}
+
+
/** Remove the request from any tracking structures
*
* Frees encoded packets if the request is being moved to a new connection
.request_cancel = request_cancel,
};
+static const trunk_io_funcs_t io_replicate_funcs = {
+ .connection_alloc = thread_conn_alloc,
+ .connection_notify = thread_conn_notify,
+ .request_prioritise = request_prioritise,
+ .request_mux = request_replicate_mux,
+ .request_demux = request_replicate_demux,
+ .request_conn_release = request_conn_release,
+ .request_complete = request_complete,
+ .request_fail = request_fail,
+ .request_cancel = request_cancel,
+};
+
/** Instantiate thread data for the submodule.
*
*/
&inst->trunk_conf, inst->name, thread, false);
if (!thread->ctx.trunk) return -1;
return 0;
-
case RLM_RADIUS_MODE_REPLICATE:
+ /*
+ * We can replicate over TCP, but that uses trunks.
+ */
+ if (inst->fd_config.socket_type == SOCK_DGRAM) break;
+
+ thread->ctx.trunk = trunk_alloc(thread, mctx->el, &io_replicate_funcs,
+ &inst->trunk_conf, inst->name, thread, false);
+ if (!thread->ctx.trunk) return -1;
+ return 0;
+
case RLM_RADIUS_MODE_UNCONNECTED_REPLICATE:
break;
}
}
/*
- * Allocate the unconnected replication socket.
+ * Allocate an unconnected socket for replication.
*/
thread->bio.fd = fr_bio_fd_alloc(thread, &thread->ctx.fd_config, 0);
if (!thread->bio.fd) {