fr_event_list_t *el;
int fd;
+ int port;
bool blocked; //!< are we blocked?
- // @todo - timers, IO callbacks, etc.
-
fr_udp_queue_resume_t resume;
};
return 0;
}
-
-fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *config, fr_event_list_t *el,
+/** Allocate an outbound UDP queue.
+ *
+ * @param ctx where the structure will be allocated.
+ * @param config containing the IPs, ports, etc
+ * @param el the event list for adding events to see if the socket is writable
+ * @param resume the function to call after a delayed packet has been written
+ * @return
+ * - NULL on error
+ * - !NULL on success
+ */
+fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t const *config, fr_event_list_t *el,
fr_udp_queue_resume_t resume)
{
fr_udp_queue_t *uq;
int fd;
+ uint16_t port = config->port;
/*
* Open the socket.
*/
- fd = fr_socket_server_udp(&config->ipaddr, &config->port, NULL, true);
+ fd = fr_socket_server_udp(&config->ipaddr, &port, NULL, false);
if (fd < 0) return NULL;
+ /*
+ * Set SO_REUSEPORT if we're binding to a specific port
+ * (e.g. DHCP), so that multiple threads can use the same
+ * port.
+ */
+ if (config->port != 0) {
+ int on = 1;
+
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) < 0) goto error;
+ }
+
+ /*
+ * Bind to the given interface.
+ */
+ if (config->interface &&
+ (fr_socket_bind(fd, &config->ipaddr, &port, config->interface) < 0)) goto error;
+
uq = talloc_zero(ctx, fr_udp_queue_t);
if (!uq) {
+ error:
close(fd);
return NULL;
}
.config = config,
.el = el,
.fd = fd,
+ .port = port,
.resume = resume,
};
return uq;
}
+/** If the socket is writable, then flush packets until either it
+ * returns EWOULDBLOCK, or there are no more packets to write.
+ *
+ */
static void udp_queue_writable(UNUSED fr_event_list_t *el, UNUSED int fd,
UNUSED int flags, void *uctx)
{
- fr_udp_queue_t *uq = talloc_get_type_abort(uctx, fr_udp_queue_t);
- fr_time_t now = fr_time();
+ fr_udp_queue_t *uq = talloc_get_type_abort(uctx, fr_udp_queue_t);
+ fr_time_t now = fr_time();
+ uint8_t buffer[1];
+
+ /*
+ * Read and discard any inputs.
+ */
+ while (read(uq->fd, buffer, sizeof(buffer)) > 0) {
+ /* do nothing */
+ }
fr_dlist_foreach_safe(&uq->queue, fr_udp_queue_entry_t, entry) {
ssize_t rcode;
}
/** Write packet to socket, OR enqueue it if we get EAGAIN
+ *
+ * In most cases, the packet will get written to the socket immediately.
+ *
+ * However, if the socket is blocked, then the packet is added to an
+ * outbound queue. When the socket becomes unblocked, the packets
+ * will be sent.
*
* @param ctx the talloc context for this packet to be saved in, usually request_t
* @param uq the local queue to write it to
if (!uq->blocked) {
int retries = 0;
ssize_t rcode;
+ char buffer[1];
+
+ /*
+ * Read and discard any inputs.
+ */
+ while (read(uq->fd, buffer, sizeof(buffer)) > 0) {
+ /* do nothing */
+ }
retry:
rcode = sendto(uq->fd, packet, packet_len, 0, (struct sockaddr *) &sockaddr, socklen);