]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
update write_blocked() API
authorAlan T. DeKok <aland@freeradius.org>
Tue, 19 Nov 2024 19:10:54 +0000 (14:10 -0500)
committerAlan T. DeKok <aland@freeradius.org>
Tue, 19 Nov 2024 20:20:30 +0000 (15:20 -0500)
to notify previous BIOs that subsequent ones are blocked.

src/lib/bio/base.c
src/lib/bio/base.h
src/lib/bio/bio_priv.h
src/lib/bio/fd_write.h
src/lib/bio/mem.c
src/lib/bio/packet.c
src/lib/bio/packet.h
src/lib/bio/retry.c

index a5efde18d18815149a234f0283bbf5a9b2f4fb15..1039e999c3afdf58c233a1b55beec9bc41384eca 100644 (file)
@@ -233,6 +233,9 @@ void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb)
  *
  *  When a BIO hits EOF, it MUST call this function.  This function will take care of changing the read()
  *  function to return nothing.  It will also take care of walking back up the hierarchy, and calling any
+ *  BIO EOF callbacks.
+ *
+ *  Once all of the BIOs have been marked as blocked, it will call the application EOF callback.
  */
 void fr_bio_eof(fr_bio_t *bio)
 {
@@ -275,3 +278,49 @@ void fr_bio_eof(fr_bio_t *bio)
                this->priv_cb.eof = NULL;
        }
 }
+
+/** Internal BIO function to tell all BIOs that it's blocked.
+ *
+ *  When a BIO blocks on write, it MUST call this function.  This function will take care of walking back up
+ *  the hierarchy, and calling any write_blocked callbacks.
+ *
+ *  Once all of the BIOs have been marked as blocked, it will call the application write_blocked callback.
+ */
+int fr_bio_write_blocked(fr_bio_t *bio)
+{
+       fr_bio_common_t *this = (fr_bio_common_t *) bio;
+       int is_blocked = 1;
+
+       while (true) {
+               fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio);
+               int rcode;
+
+               /*
+                *      There are no more BIOs. Tell the application that the entire BIO chain is blocked.
+                */
+               if (!prev) {
+                       if (this->cb.write_blocked) {
+                               rcode = this->cb.write_blocked(&this->bio);
+                               if (rcode < 0) return rcode;
+                               is_blocked &= (rcode == 1);
+                       }
+                       break;
+               }
+
+               /*
+                *      Go to the previous BIO.  If it doesn't have a write_blocked handler, then keep going
+                *      back up the chain until we're at the top.
+                */
+               this = prev;
+               if (!this->priv_cb.write_blocked) continue;
+
+               /*
+                *      The EOF handler said it's NOT at EOF, so we stop processing here.
+                */
+               rcode = this->priv_cb.write_blocked((fr_bio_t *) this);
+               if (rcode < 0) return rcode;
+               is_blocked &= (rcode == 1);
+       }
+
+       return is_blocked;
+}
index 29d5fefd434ec82936854bfd0ae03f139dc695e3..d6d88389c9a5b739a8e35a3b846c6323af127ca1 100644 (file)
@@ -90,7 +90,7 @@ typedef struct {
        fr_bio_callback_t       failed;                 //!< called when the BIO fails
 
        fr_bio_io_t             read_blocked;
-       fr_bio_io_t             write_blocked;
+       fr_bio_io_t             write_blocked;          //!< returns 0 for "couldn't block", 1 for "did block".
 
        fr_bio_io_t             read_resume;            //!< "unblocked" is too similar to "blocked"
        fr_bio_io_t             write_resume;
index 48fc0af61fb2fcc894ef6f59d6e686ba4ba9d224..55195b4b9de3fa5aebb18f7b86820b4b2f29fdb1 100644 (file)
@@ -93,3 +93,5 @@ static inline void CC_HINT(nonnull) fr_bio_unchain(fr_bio_t *bio)
 }
 
 void   fr_bio_eof(fr_bio_t *bio) CC_HINT(nonnull);
+
+int    fr_bio_write_blocked(fr_bio_t *bio) CC_HINT(nonnull);
index f34c99df2af1055667090c1c3f8a0db3f874f5b7..b4a1b16d18bb7da7ccc1eb5a9292b6babb5448d7 100644 (file)
@@ -20,12 +20,12 @@ if (rcode > 0) {
                fr_assert((size_t) rcode < size);
 
                /*
-                *      Set the flag and run the callback.
+                *      Set the flag, and tell the other BIOs that we're blocked.
                 */
                my->info.write_blocked = true;
 
                if (my->cb.write_blocked) {
-                       error = my->cb.write_blocked((fr_bio_t *) my);
+                       error = fr_bio_write_blocked((fr_bio_t *) my);
                        if (error < 0) return error;
                }
 
index 59a8d3db55caf127c3e9ace01ec8a1b153384cbc..a93341c5a1f5405e35f8cd399361d3396d6aa164 100644 (file)
@@ -544,16 +544,43 @@ static ssize_t fr_bio_mem_write_buffer(fr_bio_t *bio, UNUSED void *packet_ctx, v
        room = fr_bio_buf_write_room(&my->write_buffer);
 
        /*
-        *      The buffer is full.  We're now blocked.
+        *      The buffer is full, we can't write anything.
         */
        if (!room) return fr_bio_error(IO_WOULD_BLOCK);
 
-       if (room < size) size = room;
+       /*
+        *      If we're asked to write more bytes than are available in the buffer, then tell the caller that
+        *      writes are now blocked, and we can't write any more data.
+        *
+        *      Return an WOULD_BLOCK error instead of breaking our promise by writing part of the data,
+        *      instead of accepting a full application write.
+        */
+       if (room < size) {
+               int rcode;
+
+               rcode = fr_bio_write_blocked(bio);
+               if (rcode < 0) return rcode;
+
+               return fr_bio_error(IO_WOULD_BLOCK);
+       }
 
        /*
         *      As we have clamped the write, we know that this call must succeed.
         */
-       return fr_bio_buf_write(&my->write_buffer, buffer, size);
+       (void) fr_bio_buf_write(&my->write_buffer, buffer, size);
+
+       /*
+        *      If we've filled the buffer, tell the caller that writes are now blocked, and we can't write
+        *      any more data.  However, we still return the amount of data we wrote.
+        */
+       if (room == size) {
+               int rcode;
+
+               rcode = fr_bio_write_blocked(bio);
+               if (rcode < 0) return rcode;
+       }
+
+       return size;
 }
 
 /** Peek at the data in the read buffer
index 9d7cc7534eb0cdcc6eb230ef04acea10d7e6d7e9..bdf72473a940e88b13b624668bd3e0cb605e88a4 100644 (file)
 
 /** Inform all of the BIOs that the write is blocked.
  *
- *  This function should be set as the application-layer "write_blocked" callback for all BIOs created as part
+ *  This function should be set as the BIO layer "write_blocked" callback for all BIOs created as part
  *  of a #fr_bio_packet_t.  The application should also set bio->uctx=bio_packet for all BIOs.
  */
-int fr_bio_packet_write_blocked(fr_bio_t *bio)
+static int fr_bio_packet_write_blocked(fr_bio_t *bio)
 {
        fr_bio_packet_t *my = bio->uctx;
-       fr_bio_t *next;
 
        /*
         *      This function must be callable multiple times, as different portions of the BIOs can block at
@@ -42,24 +41,6 @@ int fr_bio_packet_write_blocked(fr_bio_t *bio)
        if (my->write_blocked) return 1;
        my->write_blocked = true;
 
-       /*
-        *      Inform each underlying BIO that it is blocked.  Note that we might be called from a
-        *      lower-layer BIO, so we have to start from the top of the chain.
-        *
-        *      Note that if the callback returns 0, saying "I couldn't block", then we _still_ mark the
-        *      overall BIO as blocked.
-        */
-       for (next = my->bio;
-            next != NULL;
-            next = fr_bio_next(next)) {
-               int rcode;
-
-               if (!((fr_bio_common_t *) next)->priv_cb.write_blocked) continue;
-
-               rcode = ((fr_bio_common_t *) next)->priv_cb.write_blocked(next);
-               if (rcode < 0) return rcode;
-       }
-
        /*
         *      The application doesn't want to know that it's blocked, so we just return.
         */
@@ -71,7 +52,7 @@ int fr_bio_packet_write_blocked(fr_bio_t *bio)
        return my->cb.write_blocked(my);
 }
 
-int fr_bio_packet_write_resume(fr_bio_t *bio)
+static int fr_bio_packet_write_resume(fr_bio_t *bio)
 {
        fr_bio_packet_t *my = bio->uctx;
        fr_bio_t *next;
@@ -110,7 +91,7 @@ int fr_bio_packet_write_resume(fr_bio_t *bio)
        return rcode;
 }
 
-int fr_bio_packet_read_blocked(fr_bio_t *bio)
+static int fr_bio_packet_read_blocked(fr_bio_t *bio)
 {
        fr_bio_packet_t *my = bio->uctx;
 
@@ -119,7 +100,7 @@ int fr_bio_packet_read_blocked(fr_bio_t *bio)
        return my->cb.read_blocked(my);
 }
 
-int fr_bio_packet_read_resume(fr_bio_t *bio)
+static int fr_bio_packet_read_resume(fr_bio_t *bio)
 {
        fr_bio_packet_t *my = bio->uctx;
 
@@ -168,8 +149,12 @@ void fr_bio_packet_connected(fr_bio_t *bio)
        my->cb.connected(my);
 }
 
-static void fr_bio_packet_shutdown(UNUSED fr_bio_t *bio)
+static void fr_bio_packet_shutdown(fr_bio_t *bio)
 {
+       fr_bio_packet_t *my = bio->uctx;
+
+       if (my->cb.shutdown) my->cb.shutdown(my);
+       my->cb.shutdown = NULL;
 }
 
 static void fr_bio_packet_eof(fr_bio_t *bio)
@@ -177,10 +162,15 @@ static void fr_bio_packet_eof(fr_bio_t *bio)
        fr_bio_packet_t *my = bio->uctx;
 
        if (my->cb.eof) my->cb.eof(my);
+       my->cb.eof = NULL;
 }
 
-static void fr_bio_packet_failed(UNUSED fr_bio_t *bio)
+static void fr_bio_packet_failed(fr_bio_t *bio)
 {
+       fr_bio_packet_t *my = bio->uctx;
+
+       if (my->cb.failed) my->cb.failed(my);
+       my->cb.failed = NULL;
 }
 
 
@@ -201,9 +191,9 @@ void fr_bio_packet_init(fr_bio_packet_t *my)
        };
 
        /*
-        *      Every participating BIO has us, and our callbacks set as the application-layer.
+        *      Every participating BIO has us set as the bio->uctx, and we handle all BIO callbacks.
         *
-        *      The application sets itself as my->uctx and as our callbacks.
+        *      The application sets its own pointer my->uctx and sets itself via our callbacks.
         */
        while (bio) {
                bio->uctx = my;
index d983993057385943108116c5a03a5bcc1773c821..67853fd048f654ca4e0d5094fcfdfb72b21164d7 100644 (file)
@@ -181,9 +181,5 @@ static inline CC_HINT(nonnull) int fr_bio_packet_write_flush(fr_bio_packet_t *my
 
 void   fr_bio_packet_connected(fr_bio_t *bio) CC_HINT(nonnull);
 int    fr_bio_packet_connect(fr_bio_t *bio) CC_HINT(nonnull);
-int    fr_bio_packet_write_blocked(fr_bio_t *bio) CC_HINT(nonnull);
-int    fr_bio_packet_write_resume(fr_bio_t *bio) CC_HINT(nonnull);
-int    fr_bio_packet_read_blocked(fr_bio_t *bio) CC_HINT(nonnull);
-int    fr_bio_packet_read_resume(fr_bio_t *bio) CC_HINT(nonnull);
 
 void   fr_bio_packet_init(fr_bio_packet_t *my) CC_HINT(nonnull);
index 78a62d2eb20aa5947738aa3039d11c5d4f785d59..4d6439b8ceb609982052f61665bd41452ebb8f5e 100644 (file)
@@ -127,7 +127,7 @@ struct fr_bio_retry_s {
 static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
 static void fr_bio_retry_expiry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
 static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
-static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);
+static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);
 
 #define fr_bio_retry_timer_clear(_x) do { \
                talloc_const_free((_x)->ev); \
@@ -352,7 +352,7 @@ static int fr_bio_retry_write_item(fr_bio_retry_t *my, fr_bio_retry_entry_t *ite
         *      We didn't write the whole packet, we're blocked.
         */
        if ((size_t) rcode < item->size) {
-               if (fr_bio_retry_blocked(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */
+               if (fr_bio_retry_save_write(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */
 
                return 0;
        }
@@ -498,14 +498,9 @@ static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const
        return fr_bio_retry_write(bio, packet_ctx, buffer, size);
 }
 
-/** The write is blocked.
- *
- *  We couldn't write out the entire packet, the bio is blocked.  Don't write anything else until we become
- *  unblocked!
- *
- *  And free the timer.  There's no point in trying to write things if the socket is blocked.
+/** Save a partial packet when the write becomes blocked.
  */
-static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
+static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
 {
        fr_assert(!my->partial);
        fr_assert(rcode > 0);
@@ -525,12 +520,12 @@ static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *it
        fr_bio_buf_write(&my->buffer, item->buffer + rcode, item->size - rcode);
 
        my->partial = item;
-       my->info.write_blocked = true;
 
        /*
-        *      There's no timer, as the write is blocked, so we can't retry.
+        *      If the "next" BIO blocked, then the call to fr_bio_write_blocked() will have already called
+        *      this function.
         */
-       fr_bio_retry_timer_clear(my);
+       if (fr_bio_retry_write_blocked(&my->bio) < 0) return fr_bio_error(GENERIC);
 
        my->bio.write = fr_bio_retry_write_partial;
 
@@ -612,7 +607,7 @@ ssize_t fr_bio_retry_rewrite(fr_bio_t *bio, fr_bio_retry_entry_t *item, const vo
        /*
         *      We had previously written the packet, so save the re-sent one, too.
         */
-       return fr_bio_retry_blocked(my, item, rcode);
+       return fr_bio_retry_save_write(my, item, rcode);
 }
 
 /** A previous timer write had a fatal error, so we forbid further writes.
@@ -830,7 +825,7 @@ static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *b
         *      We only wrote part of the packet, remember to write the rest of it.
         */
        if ((size_t) rcode < size) {
-               return fr_bio_retry_blocked(my, item, rcode);
+               return fr_bio_retry_save_write(my, item, rcode);
        }
 
        /*