typedef int (*fr_bio_callback_t)(fr_bio_t *bio); /* activate / shutdown callbacks */
-typedef int (*fr_bio_signal_t)(fr_bio_t *bio); /* read / write pause / resume */
-
typedef struct {
fr_bio_callback_t activate;
fr_bio_callback_t shutdown;
- fr_bio_signal_t read_blocked;
- fr_bio_signal_t write_blocked;
+ fr_bio_callback_t read_blocked;
+ fr_bio_callback_t write_blocked;
- fr_bio_signal_t read_resume; //!< "unblocked" is too similar to "blocked"
- fr_bio_signal_t write_resume;
+ fr_bio_callback_t read_resume; //!< "unblocked" is too similar to "blocked"
+ fr_bio_callback_t write_resume;
} fr_bio_cb_funcs_t;
/** Accept a new connection on a bio
char const *fr_bio_strerror(ssize_t error);
-int fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb) CC_HINT(nonnull(1));
+void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb) CC_HINT(nonnull(1));
#undef _CONST
static bool radius_client_retry_response(fr_bio_t *bio, fr_bio_retry_entry_t **retry_ctx_p, UNUSED void *packet_ctx, const void *buffer, UNUSED size_t size);
static void radius_client_retry_release(fr_bio_t *bio, fr_bio_retry_entry_t *retry_ctx, UNUSED fr_bio_retry_release_reason_t reason);
static ssize_t radius_client_retry(fr_bio_t *bio, fr_bio_retry_entry_t *retry_ctx, UNUSED const void *buffer, NDEBUG_UNUSED size_t size);
+
+static int fr_radius_client_bio_write_blocked(fr_bio_t *bio);
+static int fr_radius_client_bio_write_resume(fr_bio_t *bio);
+
+static const fr_bio_cb_funcs_t client_bio_cb_funcs = {
+ .write_blocked = fr_radius_client_bio_write_blocked,
+ .write_resume = fr_radius_client_bio_write_resume,
+};
+
fr_bio_packet_t *fr_radius_client_bio_alloc(TALLOC_CTX *ctx, fr_radius_client_config_t *cfg, fr_bio_fd_config_t const *fd_cfg)
{
fr_assert(fd_cfg->type == FR_BIO_FD_CONNECTED);
*/
my->fd->uctx = my;
- /*
- * Set up read / write blocked / resume callbacks.
- */
- if (cfg->packet_cb_cfg.write_blocked || cfg->packet_cb_cfg.read_blocked) {
- fr_radius_client_bio_cb_set((fr_bio_packet_t *) my, &cfg->packet_cb_cfg);
- }
-
my->info.fd_info = fr_bio_fd_info(my->fd);
fr_assert(my->info.fd_info != NULL);
my->mem = fr_bio_mem_alloc(my, read_size, 2 * 4096, my->fd);
if (!my->mem) goto fail;
+
my->mem->uctx = &my->cfg.verify;
if (cfg->packet_cb_cfg.retry) rewrite = radius_client_retry;
my->retry = fr_bio_retry_alloc(my, 256, radius_client_retry_sent, radius_client_retry_response,
rewrite, radius_client_retry_release, &cfg->retry_cfg, my->mem);
if (!my->retry) goto fail;
+
my->retry->uctx = my;
my->info.retry_info = fr_bio_retry_info(my->retry);
my->common.bio = my->retry;
+ /*
+ * Inform all BIOs about the write pause / resume callbacks
+ */
+ fr_radius_client_bio_cb_set(bio, cfg->packet_cb_cfg);
+
/*
* Set up the connected status.
*/
return rcode;
}
+/** Callback for when the writes are blocked.
+ *
+ */
static int fr_radius_client_bio_write_blocked(fr_bio_t *bio)
{
fr_radius_client_fd_bio_t *my = bio->uctx;
int rcode;
+ /*
+ * This function must be callable multiple times, as different portions of the BIOs can block at
+ * different times.
+ */
if (my->common.write_blocked) return 1;
+ /*
+ * Mark the retry code as blocked, so that it stops trying to write packets.
+ */
rcode = fr_bio_retry_write_blocked(my->retry);
if (rcode < 0) return rcode;
+ /*
+ * The application doesn't want to know that it's blocked, so we just return.
+ */
if (!my->common.cb.write_blocked) {
my->common.write_blocked = true;
return 1;
}
+ /*
+ * Tell the application that IO is blocked.
+ */
rcode = my->common.cb.write_blocked(&my->common);
if (rcode <= 0) return rcode;
}
+/** Callback for when the writes can be resumed
+ *
+ */
static int fr_radius_client_bio_write_resume(fr_bio_t *bio)
{
fr_radius_client_fd_bio_t *my = bio->uctx;
int rcode;
+ /*
+ * This function must be callable multiple times, as different portions of the BIOs can block or
+ * resume at different times.
+ */
if (!my->common.write_blocked) return 1;
/*
rcode = fr_bio_mem_write_resume(my->mem);
if (rcode <= 0) return rcode;
+ /*
+ * Flush the packets which should have been retried, but weren't due to blocking.
+ */
rcode = fr_bio_retry_write_resume(my->retry);
if (rcode <= 0) return rcode;
*/
if (my->all_ids_used) return 0;
+ /*
+ * The application doesn't want to know that it's resumed, so we just return.
+ */
if (!my->common.cb.write_resume) {
my->common.write_blocked = false;
return 1;
}
+ /*
+ * Tell the application that IO has resumed.
+ */
rcode = my->common.cb.write_resume(&my->common);
if (rcode <= 0) return rcode;
return 1;
}
-int fr_radius_client_bio_cb_set(fr_bio_packet_t *bio, fr_bio_packet_cb_funcs_t const *cb)
+void fr_radius_client_bio_cb_set(fr_bio_packet_t *bio, fr_bio_packet_cb_funcs_t const *cb)
{
fr_radius_client_fd_bio_t *my = talloc_get_type_abort(bio, fr_radius_client_fd_bio_t);
fr_bio_cb_funcs_t bio_cb = {};
my->common.cb = *cb;
- return fr_bio_cb_set(my->fd, &bio_cb);
+ fr_bio_cb_set(my->fd, &bio_cb);
+ fr_bio_cb_set(my->mem, &bio_cb);
+ fr_bio_cb_set(my->retry, &bio_cb);
}