to notify previous BIOs that subsequent ones are blocked.
*
* When a BIO hits EOF, it MUST call this function. This function will take care of changing the read()
* function to return nothing. It will also take care of walking back up the hierarchy, and calling any
+ * BIO EOF callbacks.
+ *
+ * Once all of the BIOs have been marked as blocked, it will call the application EOF callback.
*/
void fr_bio_eof(fr_bio_t *bio)
{
this->priv_cb.eof = NULL;
}
}
+
+/** Internal BIO function to tell all BIOs that it's blocked.
+ *
+ * When a BIO blocks on write, it MUST call this function. This function will take care of walking back up
+ * the hierarchy, and calling any write_blocked callbacks.
+ *
+ * Once all of the BIOs have been marked as blocked, it will call the application write_blocked callback.
+ */
+int fr_bio_write_blocked(fr_bio_t *bio)
+{
+ fr_bio_common_t *this = (fr_bio_common_t *) bio;
+ int is_blocked = 1;
+
+ while (true) {
+ fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio);
+ int rcode;
+
+ /*
+ * There are no more BIOs. Tell the application that the entire BIO chain is blocked.
+ */
+ if (!prev) {
+ if (this->cb.write_blocked) {
+ rcode = this->cb.write_blocked(&this->bio);
+ if (rcode < 0) return rcode;
+ is_blocked &= (rcode == 1);
+ }
+ break;
+ }
+
+ /*
+ * Go to the previous BIO. If it doesn't have a write_blocked handler, then keep going
+ * back up the chain until we're at the top.
+ */
+ this = prev;
+ if (!this->priv_cb.write_blocked) continue;
+
+ /*
+ * The EOF handler said it's NOT at EOF, so we stop processing here.
+ */
+ rcode = this->priv_cb.write_blocked((fr_bio_t *) this);
+ if (rcode < 0) return rcode;
+ is_blocked &= (rcode == 1);
+ }
+
+ return is_blocked;
+}
fr_bio_callback_t failed; //!< called when the BIO fails
fr_bio_io_t read_blocked;
- fr_bio_io_t write_blocked;
+ fr_bio_io_t write_blocked; //!< returns 0 for "couldn't block", 1 for "did block".
fr_bio_io_t read_resume; //!< "unblocked" is too similar to "blocked"
fr_bio_io_t write_resume;
}
void fr_bio_eof(fr_bio_t *bio) CC_HINT(nonnull);
+
+int fr_bio_write_blocked(fr_bio_t *bio) CC_HINT(nonnull);
fr_assert((size_t) rcode < size);
/*
- * Set the flag and run the callback.
+ * Set the flag, and tell the other BIOs that we're blocked.
*/
my->info.write_blocked = true;
if (my->cb.write_blocked) {
- error = my->cb.write_blocked((fr_bio_t *) my);
+ error = fr_bio_write_blocked((fr_bio_t *) my);
if (error < 0) return error;
}
room = fr_bio_buf_write_room(&my->write_buffer);
/*
- * The buffer is full. We're now blocked.
+ * The buffer is full, we can't write anything.
*/
if (!room) return fr_bio_error(IO_WOULD_BLOCK);
- if (room < size) size = room;
+ /*
+ * If we're asked to write more bytes than are available in the buffer, then tell the caller that
+ * writes are now blocked, and we can't write any more data.
+ *
+ * Return an WOULD_BLOCK error instead of breaking our promise by writing part of the data,
+ * instead of accepting a full application write.
+ */
+ if (room < size) {
+ int rcode;
+
+ rcode = fr_bio_write_blocked(bio);
+ if (rcode < 0) return rcode;
+
+ return fr_bio_error(IO_WOULD_BLOCK);
+ }
/*
* As we have clamped the write, we know that this call must succeed.
*/
- return fr_bio_buf_write(&my->write_buffer, buffer, size);
+ (void) fr_bio_buf_write(&my->write_buffer, buffer, size);
+
+ /*
+ * If we've filled the buffer, tell the caller that writes are now blocked, and we can't write
+ * any more data. However, we still return the amount of data we wrote.
+ */
+ if (room == size) {
+ int rcode;
+
+ rcode = fr_bio_write_blocked(bio);
+ if (rcode < 0) return rcode;
+ }
+
+ return size;
}
/** Peek at the data in the read buffer
/** Inform all of the BIOs that the write is blocked.
*
- * This function should be set as the application-layer "write_blocked" callback for all BIOs created as part
+ * This function should be set as the BIO layer "write_blocked" callback for all BIOs created as part
* of a #fr_bio_packet_t. The application should also set bio->uctx=bio_packet for all BIOs.
*/
-int fr_bio_packet_write_blocked(fr_bio_t *bio)
+static int fr_bio_packet_write_blocked(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;
- fr_bio_t *next;
/*
* This function must be callable multiple times, as different portions of the BIOs can block at
if (my->write_blocked) return 1;
my->write_blocked = true;
- /*
- * Inform each underlying BIO that it is blocked. Note that we might be called from a
- * lower-layer BIO, so we have to start from the top of the chain.
- *
- * Note that if the callback returns 0, saying "I couldn't block", then we _still_ mark the
- * overall BIO as blocked.
- */
- for (next = my->bio;
- next != NULL;
- next = fr_bio_next(next)) {
- int rcode;
-
- if (!((fr_bio_common_t *) next)->priv_cb.write_blocked) continue;
-
- rcode = ((fr_bio_common_t *) next)->priv_cb.write_blocked(next);
- if (rcode < 0) return rcode;
- }
-
/*
* The application doesn't want to know that it's blocked, so we just return.
*/
return my->cb.write_blocked(my);
}
-int fr_bio_packet_write_resume(fr_bio_t *bio)
+static int fr_bio_packet_write_resume(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;
fr_bio_t *next;
return rcode;
}
-int fr_bio_packet_read_blocked(fr_bio_t *bio)
+static int fr_bio_packet_read_blocked(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;
return my->cb.read_blocked(my);
}
-int fr_bio_packet_read_resume(fr_bio_t *bio)
+static int fr_bio_packet_read_resume(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;
my->cb.connected(my);
}
-static void fr_bio_packet_shutdown(UNUSED fr_bio_t *bio)
+static void fr_bio_packet_shutdown(fr_bio_t *bio)
{
+ fr_bio_packet_t *my = bio->uctx;
+
+ if (my->cb.shutdown) my->cb.shutdown(my);
+ my->cb.shutdown = NULL;
}
static void fr_bio_packet_eof(fr_bio_t *bio)
fr_bio_packet_t *my = bio->uctx;
if (my->cb.eof) my->cb.eof(my);
+ my->cb.eof = NULL;
}
-static void fr_bio_packet_failed(UNUSED fr_bio_t *bio)
+static void fr_bio_packet_failed(fr_bio_t *bio)
{
+ fr_bio_packet_t *my = bio->uctx;
+
+ if (my->cb.failed) my->cb.failed(my);
+ my->cb.failed = NULL;
}
};
/*
- * Every participating BIO has us, and our callbacks set as the application-layer.
+ * Every participating BIO has us set as the bio->uctx, and we handle all BIO callbacks.
*
- * The application sets itself as my->uctx and as our callbacks.
+ * The application sets its own pointer my->uctx and sets itself via our callbacks.
*/
while (bio) {
bio->uctx = my;
void fr_bio_packet_connected(fr_bio_t *bio) CC_HINT(nonnull);
int fr_bio_packet_connect(fr_bio_t *bio) CC_HINT(nonnull);
-int fr_bio_packet_write_blocked(fr_bio_t *bio) CC_HINT(nonnull);
-int fr_bio_packet_write_resume(fr_bio_t *bio) CC_HINT(nonnull);
-int fr_bio_packet_read_blocked(fr_bio_t *bio) CC_HINT(nonnull);
-int fr_bio_packet_read_resume(fr_bio_t *bio) CC_HINT(nonnull);
void fr_bio_packet_init(fr_bio_packet_t *my) CC_HINT(nonnull);
static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
static void fr_bio_retry_expiry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
-static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);
+static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);
#define fr_bio_retry_timer_clear(_x) do { \
talloc_const_free((_x)->ev); \
* We didn't write the whole packet, we're blocked.
*/
if ((size_t) rcode < item->size) {
- if (fr_bio_retry_blocked(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */
+ if (fr_bio_retry_save_write(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */
return 0;
}
return fr_bio_retry_write(bio, packet_ctx, buffer, size);
}
-/** The write is blocked.
- *
- * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
- * unblocked!
- *
- * And free the timer. There's no point in trying to write things if the socket is blocked.
+/** Save a partial packet when the write becomes blocked.
*/
-static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
+static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
{
fr_assert(!my->partial);
fr_assert(rcode > 0);
fr_bio_buf_write(&my->buffer, item->buffer + rcode, item->size - rcode);
my->partial = item;
- my->info.write_blocked = true;
/*
- * There's no timer, as the write is blocked, so we can't retry.
+ * If the "next" BIO blocked, then the call to fr_bio_write_blocked() will have already called
+ * this function.
*/
- fr_bio_retry_timer_clear(my);
+ if (fr_bio_retry_write_blocked(&my->bio) < 0) return fr_bio_error(GENERIC);
my->bio.write = fr_bio_retry_write_partial;
/*
* We had previously written the packet, so save the re-sent one, too.
*/
- return fr_bio_retry_blocked(my, item, rcode);
+ return fr_bio_retry_save_write(my, item, rcode);
}
/** A previous timer write had a fatal error, so we forbid further writes.
* We only wrote part of the packet, remember to write the rest of it.
*/
if ((size_t) rcode < size) {
- return fr_bio_retry_blocked(my, item, rcode);
+ return fr_bio_retry_save_write(my, item, rcode);
}
/*