]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
cleanups and fixes
authorAlan T. DeKok <aland@freeradius.org>
Thu, 9 Sep 2021 15:34:56 +0000 (11:34 -0400)
committerAlan T. DeKok <aland@freeradius.org>
Thu, 9 Sep 2021 18:10:56 +0000 (14:10 -0400)
bind to interface
set REUSEPORT
read and discard any replies

src/lib/util/udp_queue.c
src/lib/util/udp_queue.h

index 1d48adab704b43b3e8c5ff1d7f1953df5a4eed1a..76615692d9bde795e2c57edc9dd59e942cdb7cff 100644 (file)
@@ -37,10 +37,9 @@ struct fr_udp_queue_s {
 
        fr_event_list_t         *el;
        int                     fd;
+       int                     port;
        bool                    blocked;                //!< are we blocked?
 
-       // @todo - timers, IO callbacks, etc.
-
        fr_udp_queue_resume_t   resume;
 };
 
@@ -82,21 +81,49 @@ static int _udp_queue_entry_free(fr_udp_queue_entry_t *entry)
        return 0;
 }
 
-
-fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *config, fr_event_list_t *el,
+/** Allocate an outbound UDP queue.
+ *
+ * @param ctx  where the structure will be allocated.
+ * @param config containing the IPs, ports, etc
+ * @param el   the event list for adding events to see if the socket is writable
+ * @param resume the function to call after a delayed packet has been written
+ * @return
+ *     - NULL on error
+ *     - !NULL on success
+ */
+fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t const *config, fr_event_list_t *el,
                                   fr_udp_queue_resume_t resume)
 {
        fr_udp_queue_t *uq;
        int fd;
+       uint16_t port = config->port;
 
        /*
         *      Open the socket.
         */
-       fd = fr_socket_server_udp(&config->ipaddr, &config->port, NULL, true);
+       fd = fr_socket_server_udp(&config->ipaddr, &port, NULL, false);
        if (fd < 0) return NULL;
 
+       /*
+        *      Set SO_REUSEPORT if we're binding to a specific port
+        *      (e.g. DHCP), so that multiple threads can use the same
+        *      port.
+        */
+       if (config->port != 0) {
+               int on = 1;
+
+               if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) < 0) goto error;
+       }
+
+       /*
+        *      Bind to the given interface.
+        */
+       if (config->interface &&
+           (fr_socket_bind(fd, &config->ipaddr, &port, config->interface) < 0)) goto error;
+
        uq = talloc_zero(ctx, fr_udp_queue_t);
        if (!uq) {
+       error:
                close(fd);
                return NULL;
        }
@@ -105,6 +132,7 @@ fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *confi
                .config = config,
                .el = el,
                .fd = fd,
+               .port = port,
                .resume = resume,
        };
 
@@ -115,11 +143,23 @@ fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *confi
        return uq;
 }
 
+/** If the socket is writable, then flush packets until either it
+ * returns EWOULDBLOCK, or there are no more packets to write.
+ *
+ */
 static void udp_queue_writable(UNUSED fr_event_list_t *el, UNUSED int fd,
                               UNUSED int flags, void *uctx)
 {
-       fr_udp_queue_t *uq = talloc_get_type_abort(uctx, fr_udp_queue_t);
-       fr_time_t now = fr_time();
+       fr_udp_queue_t  *uq = talloc_get_type_abort(uctx, fr_udp_queue_t);
+       fr_time_t       now = fr_time();
+       uint8_t         buffer[1];
+
+       /*
+        *      Read and discard any inputs.
+        */
+       while (read(uq->fd, buffer, sizeof(buffer)) > 0) {
+               /* do nothing */
+       }
 
        fr_dlist_foreach_safe(&uq->queue, fr_udp_queue_entry_t, entry) {
                ssize_t rcode;
@@ -171,6 +211,12 @@ static void udp_queue_writable(UNUSED fr_event_list_t *el, UNUSED int fd,
 }
 
 /** Write packet to socket, OR enqueue it if we get EAGAIN
+ *
+ *  In most cases, the packet will get written to the socket immediately.
+ *
+ *  However, if the socket is blocked, then the packet is added to an
+ *  outbound queue.  When the socket becomes unblocked, the packets
+ *  will be sent.
  *
  * @param ctx  the talloc context for this packet to be saved in, usually request_t
  * @param uq   the local queue to write it to
@@ -198,6 +244,14 @@ int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, uint8_t const *packe
        if (!uq->blocked) {
                int retries = 0;
                ssize_t rcode;
+               char buffer[1];
+
+               /*
+                *      Read and discard any inputs.
+                */
+               while (read(uq->fd, buffer, sizeof(buffer)) > 0) {
+                       /* do nothing */
+               }
 
 retry:
                rcode = sendto(uq->fd, packet, packet_len, 0, (struct sockaddr *) &sockaddr, socklen);
index a930bb65ba0c97ea0ed86cc64df1cecf69ea60cd..5e51a17858ed3616967f08b5865a1eb23b324158 100644 (file)
@@ -53,7 +53,7 @@ typedef struct fr_udp_queue_s fr_udp_queue_t;
 typedef void (*fr_udp_queue_resume_t)(bool written, void *rctx);
 
 
-fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t *config, fr_event_list_t *el,
+fr_udp_queue_t *fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t const *config, fr_event_list_t *el,
                                   fr_udp_queue_resume_t resume) CC_HINT(nonnull(2,3));
 
 int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq,