]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
add callbacks for read/write blocked, and read/write resume
authorAlan T. DeKok <aland@freeradius.org>
Tue, 14 May 2024 18:36:07 +0000 (14:36 -0400)
committerAlan T. DeKok <aland@freeradius.org>
Tue, 14 May 2024 18:44:59 +0000 (14:44 -0400)
for now, only write blocked/resume is plumbed in.

src/lib/bio/base.h
src/lib/bio/fd.c
src/lib/bio/fd_errno.h
src/lib/bio/fd_write.h [new file with mode: 0644]

index b3549cfb06fa84d6e66f22e90b8686ded7c8c447..93f4d6861e7ccc927935412466a763cdfdcb0126 100644 (file)
@@ -86,6 +86,12 @@ typedef int (*fr_bio_callback_t)(fr_bio_t *bio); /* activate / shutdown callback
 typedef struct {
        fr_bio_callback_t       activate;
        fr_bio_callback_t       shutdown;
+
+       fr_bio_callback_t       read_blocked;
+       fr_bio_callback_t       write_blocked;
+
+       fr_bio_callback_t       read_resume;            //!< "unblocked" is too similar to "blocked"
+       fr_bio_callback_t       write_resume;
 } fr_bio_cb_funcs_t;
 
 /** Accept a new connection on a bio
index 0de587887903f000ff585244a239e25735bbb00d..57adc6ec4bd4de3a4a6544199f4b7a38f05bc047 100644 (file)
@@ -132,7 +132,7 @@ retry:
        }
 
 #undef flag_blocked
-#define flag_blocked info.read_blocked
+#define flag_blocked read_blocked
 #include "fd_errno.h"
 
        return fr_bio_error(IO);
@@ -158,7 +158,7 @@ retry:
        if (rcode >= 0) return rcode;
 
 #undef flag_blocked
-#define flag_blocked info.read_blocked
+#define flag_blocked read_blocked
 #include "fd_errno.h"
 
        return fr_bio_error(IO);
@@ -193,16 +193,15 @@ retry:
                return rcode;
        }
 
-       if (rcode == 0 ) return rcode;
+       if (rcode == 0) return rcode;
 
 #undef flag_blocked
-#define flag_blocked info.read_blocked
+#define flag_blocked read_blocked
 #include "fd_errno.h"
 
        return fr_bio_error(IO);
 }
 
-
 /** Write to fd.
  *
  *  This function is used for connected sockets, where we ignore the packet_ctx.
@@ -218,8 +217,6 @@ static ssize_t fr_bio_fd_write(fr_bio_t *bio, UNUSED void *packet_ctx, const voi
         */
        if (!buffer) return 0;
 
-       my->info.write_blocked = false;
-
 retry:
        /*
         *      We could call send() instead of write()!  Posix says:
@@ -236,11 +233,8 @@ retry:
         *      here.
         */
        rcode = write(my->info.socket.fd, buffer, size);
-       if (rcode >= 0) return rcode;
 
-#undef flag_blocked
-#define flag_blocked info.write_blocked
-#include "fd_errno.h"
+#include "fd_write.h"
 
        return fr_bio_error(IO);
 }
@@ -262,18 +256,13 @@ static ssize_t fr_bio_fd_sendto(fr_bio_t *bio, void *packet_ctx, const void *buf
         */
        if (!buffer) return 0;
 
-       my->info.write_blocked = false;
-
        // get destination IP
        (void) fr_ipaddr_to_sockaddr(&sockaddr, &salen, &addr->socket.inet.dst_ipaddr, addr->socket.inet.dst_port);
 
 retry:
        rcode = sendto(my->info.socket.fd, buffer, size, 0, (struct sockaddr *) &sockaddr, salen);
-       if (rcode >= 0) return rcode;
 
-#undef flag_blocked
-#define flag_blocked info.write_blocked
-#include "fd_errno.h"
+#include "fd_write.h"
 
        return fr_bio_error(IO);
 }
@@ -325,7 +314,7 @@ retry:
        if (rcode == 0) return rcode;
 
 #undef flag_blocked
-#define flag_blocked info.read_blocked
+#define flag_blocked read_blocked
 #include "fd_errno.h"
 
        return fr_bio_error(IO);
@@ -413,8 +402,6 @@ static ssize_t fr_bio_fd_sendfromto4(fr_bio_t *bio, void *packet_ctx, const void
        fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);
        fr_bio_fd_packet_ctx_t *addr = fr_bio_fd_packet_ctx(my, packet_ctx);
 
-       my->info.write_blocked = false;
-
        memset(&my->cbuf, 0, sizeof(my->cbuf));
        memset(&my->msgh, 0, sizeof(struct msghdr));
 
@@ -470,9 +457,7 @@ retry:
        rcode = sendmsg(my->info.socket.fd, &my->msgh, 0);
        if (rcode >= 0) return rcode;
 
-#undef flag_blocked
-#define flag_blocked info.write_blocked
-#include "fd_errno.h"
+#include "fd_write.h"
 
        return fr_bio_error(IO);
 }
@@ -566,8 +551,6 @@ static ssize_t fr_bio_fd_sendfromto6(fr_bio_t *bio, void *packet_ctx, const void
        fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);
        fr_bio_fd_packet_ctx_t *addr = fr_bio_fd_packet_ctx(my, packet_ctx);
 
-       my->info.write_blocked = false;
-
        memset(&my->cbuf, 0, sizeof(my->cbuf));
        memset(&my->msgh, 0, sizeof(struct msghdr));
 
@@ -607,11 +590,8 @@ static ssize_t fr_bio_fd_sendfromto6(fr_bio_t *bio, void *packet_ctx, const void
 
 retry:
        rcode = sendmsg(my->info.socket.fd, &my->msgh, 0);
-       if (rcode >= 0) return rcode;
 
-#undef flag_blocked
-#define flag_blocked info.write_blocked
-#include "fd_errno.h"
+#include "fd_write.h"
 
        return fr_bio_error(IO);
 }
@@ -730,7 +710,10 @@ retry:
                  *      to call fr_bio_fd_connect() before calling write()
                  */
         case EINPROGRESS:
+               if (!my->info.write_blocked && my->cb.write_blocked) my->cb.write_blocked((fr_bio_t *) my);
+
                my->info.write_blocked = true;
+
                return fr_bio_error(IO_WOULD_BLOCK);
 
         default:
@@ -805,11 +788,8 @@ int fr_bio_fd_init_connected(fr_bio_fd_t *my)
 
        my->info.eof = false;
 
-       /*
-        *      The socket shouldn't be selected for read.  But it should be selected for write.
-        */
        my->info.read_blocked = false;
-       my->info.write_blocked = true;
+       my->info.write_blocked = false;
 
 #ifdef SO_NOSIGPIPE
        /*
@@ -836,6 +816,9 @@ int fr_bio_fd_init_connected(fr_bio_fd_t *my)
 
        if (rcode != fr_bio_error(IO_WOULD_BLOCK)) return rcode;
 
+       /*
+        *      The socket is blocked, and should be selected for writing.
+        */
        fr_assert(my->info.write_blocked);
        fr_assert(my->info.state == FR_BIO_FD_STATE_CONNECTING);
 
@@ -1035,8 +1018,8 @@ fr_bio_t *fr_bio_fd_alloc(TALLOC_CTX *ctx, fr_bio_cb_funcs_t *cb, fr_bio_fd_conf
                                .af = AF_UNSPEC,
                        },
                        .type = FR_BIO_FD_UNCONNECTED,
-                       .read_blocked = true,
-                       .write_blocked = true,
+                       .read_blocked = false,
+                       .write_blocked = false,
                        .eof = false,
                        .state = FR_BIO_FD_STATE_CLOSED,
                };
@@ -1187,7 +1170,7 @@ retry:
        if (rcode >= 0) return 0;
 
 #undef flag_blocked
-#define flag_blocked info.read_blocked
+#define flag_blocked read_blocked
 #include "fd_errno.h"
 
        return fr_bio_error(IO);
index a9a42893ccb7a91f93c7e1f37b6fbf138e3620f2..edc227631cd142766175bbac9a7c9694bd2bff8d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *     Code snippet to avoid duplication.
+ *     We have an error, so we have common error handling code.
  */
 switch (errno) {
 case EINTR:
@@ -17,7 +17,9 @@ case EAGAIN:
        /*
         *      The operation would block, return that.
         */
-       my->flag_blocked = true;
+       if (!my->info.flag_blocked && my->cb.flag_blocked) my->cb.flag_blocked((fr_bio_t *) my);
+
+       my->info.flag_blocked = true;
        return fr_bio_error(IO_WOULD_BLOCK);
 
 default:
diff --git a/src/lib/bio/fd_write.h b/src/lib/bio/fd_write.h
new file mode 100644 (file)
index 0000000..19516e1
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ *     Common finalization code for the write functions.
+ *
+ *     This is in a header file because of "goto retry" in fd_errno.h.
+ *
+ *     @todo - do we want the callbacks to notify the _previous_ BIO in the chain?  That way the top-level
+ *     BIO can notify the application.
+ */
+if (rcode > 0) {
+       /*
+        *      We weren't blocked, but we are now.
+        */
+       if (!my->info.write_blocked) {
+               if ((size_t) rcode == size) {
+                       return rcode;
+               }
+
+               fr_assert((size_t) rcode < size);
+
+               /*
+                *      Set the flag and run the callback.
+                */
+               my->info.write_blocked = true;
+               if (my->cb.write_blocked) my->cb.write_blocked((fr_bio_t *) my);
+
+               return rcode;   
+       }
+
+       /*
+        *      We were blocked.  We're still blocked if we wrote _less_ than the amount of requested data.
+        *      If we wrote all of the data which was requested, then we're unblocked.
+        */
+       my->info.write_blocked = ((size_t) rcode == size);
+
+       /*
+        *      Call the "resume" function if we transitioned to being unblocked.
+        */
+       if (!my->info.write_blocked && my->cb.write_resume) my->cb.write_resume((fr_bio_t *) my);
+
+       return rcode;
+}
+
+if (rcode == 0) return rcode;
+
+#undef flag_blocked
+#define flag_blocked write_blocked
+#include "fd_errno.h"