From: Alan T. DeKok Date: Thu, 9 Sep 2021 13:44:03 +0000 (-0400) Subject: API to write (or enqueue) UDP packets X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=53555a370c54b1380f9c455e1571b1cb81543d4a;p=thirdparty%2Ffreeradius-server.git API to write (or enqueue) UDP packets for use with "fire and forget" UDP messaging, such as DHCP relay, RADIUS replication, etc. --- diff --git a/src/lib/util/libfreeradius-util.mk b/src/lib/util/libfreeradius-util.mk index 16027020f68..528462e290c 100644 --- a/src/lib/util/libfreeradius-util.mk +++ b/src/lib/util/libfreeradius-util.mk @@ -81,6 +81,7 @@ SOURCES := \ types.c \ udp.c \ udpfromto.c \ + udp_queue.c \ uri.c \ value.c \ version.c diff --git a/src/lib/util/udp_queue.c b/src/lib/util/udp_queue.c new file mode 100644 index 00000000000..08d2b96d459 --- /dev/null +++ b/src/lib/util/udp_queue.c @@ -0,0 +1,255 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + */ + +/* + * $Id$ + * + * @file src/lib/server/udp_queue.c + * @brief Handle queues of outgoing UDP packets + * + * @author Alan DeKok (aland@freeradius.org) + * + * @copyright 2021 Network RADIUS SAS (legal@networkradius.com) + */ +RCSID("$Id$") + +#include + +#include +#include + +struct fr_udp_queue_s { + fr_udp_queue_config_t const *config; //!< configuration + fr_dlist_head_t queue; //!< list of queued packets to write, ordered by time + + fr_event_list_t *el; + int fd; + bool blocked; //!< are we blocked? + + // @todo - timers, IO callbacks, etc. + + fr_udp_queue_resume_t resume; +}; + +typedef struct { + struct sockaddr_storage sockaddr; + socklen_t socklen; + + fr_udp_queue_t *uq; + fr_dlist_t dlist; + + void *rctx; + + fr_time_t expires; + + size_t packet_len; + uint8_t packet[]; +} fr_udp_queue_entry_t; + +static int _udp_queue_free(fr_udp_queue_t *uq) +{ + fr_dlist_foreach_safe(&uq->queue, fr_udp_queue_entry_t, entry) { + talloc_free(entry); + }} + + return 0; +} + +static int _udp_queue_entry_free(fr_udp_queue_entry_t *entry) +{ + fr_udp_queue_t *uq = entry->uq; + void *rctx = entry->rctx; + + fr_dlist_remove(&uq->queue, entry); + + if (uq->resume) uq->resume(false, rctx); + + return 0; +} + + +fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *config, fr_event_list_t *el, + fr_udp_queue_resume_t resume) +{ + fr_udp_queue_t *uq; + int fd; + + /* + * Open the socket. + */ + fd = fr_socket_server_udp(&config->ipaddr, &config->port, NULL, true); + if (fd < 0) return NULL; + + uq = talloc_zero(ctx, fr_udp_queue_t); + if (!uq) return NULL; + + *uq = (fr_udp_queue_t) { + .config = config, + .el = el, + .fd = fd, + .resume = resume, + }; + + fr_dlist_init(&uq->queue, fr_udp_queue_entry_t, dlist); + + talloc_set_destructor(uq, _udp_queue_free); + + return uq; +} + +static void udp_queue_writable(UNUSED fr_event_list_t *el, UNUSED int fd, + UNUSED int flags, void *uctx) +{ + fr_udp_queue_t *uq = talloc_get_type_abort(uctx, fr_udp_queue_t); + fr_time_t now = fr_time(); + + fr_dlist_foreach_safe(&uq->queue, fr_udp_queue_entry_t, entry) { + ssize_t rcode; + int retries = 0; + + /* + * If the entry is expired, tell the caller that + * it wasn't written to the socket. + */ + if (now >= entry->expires) { + void *rctx = entry->rctx; + + talloc_free(entry); + if (uq->resume) uq->resume(false, rctx); + continue; + } + + retry: + rcode = sendto(uq->fd, entry->packet, entry->packet_len, 0, (struct sockaddr *) &entry->sockaddr, entry->socklen); + if (rcode >= 0) { + void *rctx = entry->rctx; + + talloc_free(entry); + if (uq->resume) uq->resume(true, rctx); + continue; + } + + if (rcode < 0) { + if (errno == EINTR) { + if (retries++ < 3) goto retry; + return; + } + +#if EWOULDBLOCK != EAGAIN + if (!((errno == EWOULDBLOCK) || (errno == EAGAIN))) return; +#else + if (errno != EWOULDBLOCK) return; +#endif + } + }} + + /* + * Nothing more to write, delete the IO handler so that we don't get extraneous signals. + */ + if (fr_dlist_num_elements(&uq->queue) == 0) { + fr_event_fd_delete(uq->el, uq->fd, FR_EVENT_FILTER_IO); + uq->blocked = false; + } +} + +/** Write packet to socket, OR enqueue it if we get EAGAIN + * + * @param ctx the talloc context for this packet to be saved in, usually request_t + * @param uq the local queue to write it to + * @param packet the packet to write + * @param packet_len how long the packet is + * @param ipaddr the IP address we're sending the packet to + * @param port the port we're sending the packet to + * @param rctx for resumption, usually request_t, or a structure which holds a request_t + * @return + * - <0 for error + * - 0 for "didn't write it to socket, but added it to the queue, and the caller should yield" + * - 1 for "wrote it to the socket, you're good to go". + */ +int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, uint8_t const *packet, size_t packet_len, + fr_ipaddr_t const *ipaddr, int port, void *rctx) +{ + struct sockaddr_storage sockaddr; + socklen_t socklen; + fr_udp_queue_entry_t *entry; + + fr_ipaddr_to_sockaddr(&sockaddr, &socklen, ipaddr, port); + + if (!packet_len || !port) return 1; + + if (!uq->blocked) { + int retries = 0; + ssize_t rcode; + +retry: + rcode = sendto(uq->fd, packet, packet_len, 0, (struct sockaddr *) &sockaddr, socklen); + if (rcode >= 0) return 1; + + if (rcode < 0) { + if (errno == EINTR) { + if (retries++ < 3) goto retry; + return -1; + } + +#if EWOULDBLOCK != EAGAIN + if (!((errno == EWOULDBLOCK) || (errno == EAGAIN))) return -1; +#else + if (errno != EWOULDBLOCK) return -1; +#endif + } + + /* + */ + if (fr_event_fd_insert(uq, uq->el, uq->fd, NULL, + udp_queue_writable, NULL, uq) < 0) { + return -1; + } + + uq->blocked = true; + } + + /* + * Limit the number of packets in the queue. + */ + if (uq->config->max_queued_packets && + (fr_dlist_num_elements(&uq->queue) >= uq->config->max_queued_packets)) { + return -1; + } + + entry = (fr_udp_queue_entry_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_udp_queue_entry_t) + packet_len); + if (!entry) return -1; + + talloc_set_type(entry, fr_udp_queue_entry_t); + talloc_set_destructor(entry, _udp_queue_entry_free); + + *entry = (fr_udp_queue_entry_t) { + .sockaddr = sockaddr, + .socklen = socklen, + .uq = uq, + .expires = fr_time() + uq->config->max_queued_time, + .rctx = rctx, + .packet_len = packet_len, + }; + + memcpy(entry->packet, packet, packet_len); + fr_dlist_insert_tail(&uq->queue, entry); + + /* + * Didn't do anything, say so. + */ + + return 0; +} diff --git a/src/lib/util/udp_queue.h b/src/lib/util/udp_queue.h new file mode 100644 index 00000000000..a930bb65ba0 --- /dev/null +++ b/src/lib/util/udp_queue.h @@ -0,0 +1,66 @@ +#pragma once +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + */ + +/** + * $Id$ + * + * @file lib/server/udp_queue.h + * @brief Handle queues of outgoing UDP packets + * + * @copyright 2021 Network RADIUS SAS (legal@networkradius.com) + */ +RCSIDH(udp_queue_h, "$Id$") + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include + +typedef struct { + fr_ipaddr_t ipaddr; //!< socket IP address + uint16_t port; //!< socket port + + char const *interface; //!< Interface to bind to. + + fr_time_delta_t max_queued_time; //!< maximum time a packet can be queued + + uint32_t max_queued_packets; //!< maximum queued packets + + uint32_t send_buff; //!< How big the kernel's send buffer should be. + + bool send_buff_is_set; //!< Whether we were provided with a send_buf +} fr_udp_queue_config_t; + +typedef struct fr_udp_queue_s fr_udp_queue_t; + +typedef void (*fr_udp_queue_resume_t)(bool written, void *rctx); + + +fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *config, fr_event_list_t *el, + fr_udp_queue_resume_t resume) CC_HINT(nonnull(2,3)); + +int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, + uint8_t const *packet, size_t packet_len, + fr_ipaddr_t const *ipaddr, int port, void *rctx) CC_HINT(nonnull(2,3,5)); + + +#ifdef __cplusplus +} +#endif