From: Alan T. DeKok Date: Tue, 9 Apr 2024 19:43:33 +0000 (-0400) Subject: flush items after write becomes unblocked X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=00c950f83b21ab311c818efaba6be3a89eebbd28;p=thirdparty%2Ffreeradius-server.git flush items after write becomes unblocked --- diff --git a/src/lib/bio/retry.c b/src/lib/bio/retry.c index cfd09664a20..d1545c86967 100644 --- a/src/lib/bio/retry.c +++ b/src/lib/bio/retry.c @@ -69,6 +69,8 @@ struct fr_bio_retry_s { fr_retry_config_t retry_config; + ssize_t error; + fr_event_timer_t const *ev; /* @@ -99,7 +101,7 @@ struct fr_bio_retry_s { static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx); static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size); - +static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode); /** Reset the timer after changing the rb tree. * @@ -179,6 +181,113 @@ static void fr_bio_retry_release(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_bio_retry_list_insert_head(&my->free, item); } +/** Write one item. + * + * @return + * - <0 on error + * - 0 for "can't write any more" + * - 1 for "wrote a packet" + */ +static int fr_bio_retry_write_item(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_time_t now) +{ + ssize_t rcode; + fr_retry_state_t state; + + /* + * Are we there yet? + * + * Release it, indicating whether or not we successfully got a reply. + */ + state = fr_retry_next(&item->retry, now); + if (state != FR_RETRY_CONTINUE) { + fr_bio_retry_release(my, item, (fr_bio_retry_release_reason_t) item->have_reply); + return 1; + } + + /* + * Write out the packet. On failure release this item. + * + * If there's an error, we hope that the next "real" write will find the error, and do any + * necessary cleanups. Note that we can't call bio shutdown here, as the bio is controlled by the + * application, and not by us. + */ + if (item->rewrite) { + rcode = item->rewrite(&my->bio, item, item->buffer, item->size); + } else { + rcode = my->rewrite(&my->bio, item, item->buffer, item->size); + } + if (rcode < 0) { + fr_bio_retry_release(my, item, FR_BIO_RETRY_WRITE_ERROR); + return rcode; + } + + /* + * We didn't write the whole packet, we're blocked. + */ + if ((size_t) rcode < item->size) { + if (fr_bio_retry_blocked(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */ + + return 0; + } + + /* + * We wrote the whole packet. Remove it from the tree, which is done _without_ doing calls to + * cmp(), so we it's OK for us to rewrite item->retry.next. + */ + (void) fr_rb_remove_by_inline_node(&my->rb, &item->node); + + /* + * We have more things to do, insert the entry back into the tree, and update the timer. + */ + (void) fr_rb_insert(&my->rb, item); + + return 1; +} + +/* + * Check for the "next next" retry. If that's still in the past, + * then skip it. But _don't_ update retry.count, as we don't + * send packets. Instead, just enforce MRD, etc. + */ +static int fr_bio_retry_write_delayed(fr_bio_retry_t *my, fr_time_t now) +{ + fr_bio_retry_entry_t *item; + fr_bio_t *next; + + fr_assert(!my->partial); + + /* + * There must be a next bio. + */ + next = fr_bio_next(&my->bio); + fr_assert(next != NULL); + + while ((item = fr_rb_first(&my->rb)) != NULL) { + int rcode; + + /* + * This item needs to be sent in the future, we're done. + */ + if (fr_time_cmp(now, item->retry.next) > 0) break; + + /* + * Write one item, and don't update timers. + */ + rcode = fr_bio_retry_write_item(my, item, now); + if (rcode <= 0) return rcode; + } + + /* + * Now that we've written multiple items, reset the timer. + * + * We do this at the end of the loop so that we don't update it for each item in the loop. + */ + (void) fr_bio_retry_reset_timer(my); + + return 0; +} + + /** There's a partial packet written. Write all of that one first, before writing another packet. * * The packet can either be cancelled, or IO blocked. In either case, we must write the full packet before @@ -256,13 +365,10 @@ static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const /* * We're supposed to send the next retry now. i.e. the socket has been blocked for a * long time. - * - * Send the packet, _but_ also immediately check for the "next next" retry. If that's - * still in the past, then skip it. But _don't_ update retry.count, as we don't send - * packets. Instead, just enforce MRD, etc. */ if (fr_time_cmp(now, item->retry.next) <= 0) { - fr_assert(0); + rcode = fr_bio_retry_write_delayed(my, now); + if (rcode < 0) return rcode; } /* @@ -382,13 +488,24 @@ ssize_t fr_bio_retry_rewrite(fr_bio_t *bio, fr_bio_retry_entry_t *item, const vo return fr_bio_retry_blocked(my, item, rcode); } +/** A previous timer write had a fatal error, so we forbid further writes. + * + */ +static ssize_t fr_bio_retry_write_fatal(fr_bio_t *bio, UNUSED void *packet_ctx, UNUSED void const *buffer, UNUSED size_t size) +{ + fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t); + + return my->error; +} +/** Run a timer event. Usually to write out another packet. + * + */ static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx) { ssize_t rcode; fr_bio_retry_t *my = talloc_get_type_abort(uctx, fr_bio_retry_t); fr_bio_retry_entry_t *item; - fr_retry_state_t state; /* * For the timer to be running, there must be a "first" entry which causes the timer to fire. @@ -402,56 +519,27 @@ static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void * item = my->first; /* - * Are we there yet? - * - * Release it, indicating whether or not we successfully got a reply. + * Retry one item. */ - state = fr_retry_next(&item->retry, now); - if (state != FR_RETRY_CONTINUE) { - fr_bio_retry_release(my, item, (fr_bio_retry_release_reason_t) item->have_reply); + rcode = fr_bio_retry_write_item(my, item, now); + if (rcode < 0) { + fr_assert(rcode != fr_bio_error(IO_WOULD_BLOCK)); /* should return 0 instead! */ + + my->error = rcode; + my->bio.write = fr_bio_retry_write_fatal; return; } /* - * Write out the packet. On failure release this item. - * - * If there's an error, we hope that the next "real" write will find the error, and do any - * necessary cleanups. Note that we can't call bio shutdown here, as the bio is controlled by the - * application, and not by us. + * Partial write - no timers get set. */ - if (item->rewrite) { - rcode = item->rewrite(&my->bio, item, item->buffer, item->size); - } else { - rcode = my->rewrite(&my->bio, item, item->buffer, item->size); - } - if (rcode < 0) { - fr_bio_retry_release(my, item, FR_BIO_RETRY_WRITE_ERROR); - return; - } + if (rcode == 0) return; /* - * We wrote the whole packet. Update the various timers and return. + * We successfull wrote this item. Reset the timer to the next one, which is likely to be a + * different one from the item we just updated. */ - if ((size_t) rcode == item->size) { - /* - * Remove it from the tree, which is done _without_ doing calls to cmp(), so we it's OK - * for us to rewrite item->retry.next. - */ - (void) fr_rb_remove_by_inline_node(&my->rb, &item->node); - - /* - * We have more things to do, insert the entry back into the tree, and update the timer. - */ - (void) fr_rb_insert(&my->rb, item); - - /* - * We're not done, reset the timer to the next one, which is likely to be a different one from - * the item we just updated. - */ - (void) fr_bio_retry_reset_timer(my); - - return; - } + (void) fr_bio_retry_reset_timer(my); } /** Write a request, and see if we have a reply.