From: Stefan Metzmacher Date: Thu, 15 May 2025 10:01:12 +0000 (+0200) Subject: lib/tsocket: add tstream_monitor_send/recv infrastructure X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=18e304487c9d247f093c890cde9d73c2a95daae1;p=thirdparty%2Fsamba.git lib/tsocket: add tstream_monitor_send/recv infrastructure Signed-off-by: Stefan Metzmacher Reviewed-by: Volker Lendecke --- diff --git a/lib/tsocket/tsocket.c b/lib/tsocket/tsocket.c index 92d8e927263..68fdb6c3b40 100644 --- a/lib/tsocket/tsocket.c +++ b/lib/tsocket/tsocket.c @@ -446,8 +446,11 @@ struct tstream_context { struct tevent_req *readv_req; struct tevent_req *writev_req; struct tevent_req *disconnect_req; + struct tevent_req *monitor_req; }; +static void tstream_monitor_disconnect(struct tstream_context *stream, int err); + static int tstream_context_destructor(struct tstream_context *stream) { if (stream->readv_req) { @@ -462,6 +465,10 @@ static int tstream_context_destructor(struct tstream_context *stream) tevent_req_received(stream->disconnect_req); } + if (stream->monitor_req != NULL) { + tevent_req_received(stream->monitor_req); + } + return 0; } @@ -485,6 +492,7 @@ struct tstream_context *_tstream_context_create(TALLOC_CTX *mem_ctx, stream->readv_req = NULL; stream->writev_req = NULL; stream->disconnect_req = NULL; + stream->monitor_req = NULL; state = talloc_size(stream, psize); if (state == NULL) { @@ -508,7 +516,16 @@ void *_tstream_context_data(struct tstream_context *stream) ssize_t tstream_pending_bytes(struct tstream_context *stream) { - return stream->ops->pending_bytes(stream); + ssize_t ret; + + ret = stream->ops->pending_bytes(stream); + if (ret == -1) { + int sys_errno = errno; + tstream_monitor_disconnect(stream, sys_errno); + errno = sys_errno; + } + + return ret; } struct tstream_readv_state { @@ -610,6 +627,7 @@ static void tstream_readv_done(struct tevent_req *subreq) ret = state->ops->readv_recv(subreq, &sys_errno); TALLOC_FREE(subreq); if (ret == -1) { + tstream_monitor_disconnect(state->stream, sys_errno); tevent_req_error(req, sys_errno); return; } @@ -732,6 +750,7 @@ static void tstream_writev_done(struct tevent_req *subreq) ret = state->ops->writev_recv(subreq, &sys_errno); if (ret == -1) { + tstream_monitor_disconnect(state->stream, sys_errno); tevent_req_error(req, sys_errno); return; } @@ -832,10 +851,13 @@ static void tstream_disconnect_done(struct tevent_req *subreq) ret = state->ops->disconnect_recv(subreq, &sys_errno); if (ret == -1) { + tstream_monitor_disconnect(state->stream, sys_errno); tevent_req_error(req, sys_errno); return; } + tstream_monitor_disconnect(state->stream, ECONNABORTED); + tevent_req_done(req); } @@ -850,3 +872,144 @@ int tstream_disconnect_recv(struct tevent_req *req, return ret; } +struct tstream_monitor_state { + const struct tstream_context_ops *ops; + struct tstream_context *stream; + struct tevent_req *subreq; + int ret; +}; + +static void tstream_monitor_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct tstream_monitor_state *state = + tevent_req_data(req, + struct tstream_monitor_state); + + TALLOC_FREE(state->subreq); + + if (state->stream != NULL) { + state->stream->monitor_req = NULL; + state->stream = NULL; + } +} + +static void tstream_monitor_done(struct tevent_req *subreq); + +struct tevent_req *tstream_monitor_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tstream_context *stream) +{ + struct tevent_req *req = NULL; + struct tstream_monitor_state *state = NULL; + struct tevent_req *subreq = NULL; + ssize_t pending; + + req = tevent_req_create(mem_ctx, &state, + struct tstream_monitor_state); + if (req == NULL) { + return NULL; + } + + state->ops = stream->ops; + state->stream = stream; + state->ret = -1; + + if (stream->disconnect_req != NULL) { + tevent_req_error(req, ECONNABORTED); + return tevent_req_post(req, ev); + } + + if (stream->monitor_req != NULL) { + tevent_req_error(req, EALREADY); + return tevent_req_post(req, ev); + } + stream->monitor_req = req; + + tevent_req_set_cleanup_fn(req, tstream_monitor_cleanup); + tevent_req_defer_callback(req, ev); + + if (state->ops->monitor_send != NULL) { + subreq = state->ops->monitor_send(state, + ev, + stream); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, tstream_monitor_done, req); + + state->subreq = subreq; + return req; + } + + pending = stream->ops->pending_bytes(stream); + if (pending < 0) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + + /* + * just remain forever pending until + * tstream_monitor_disconnect() is triggered + * in any way. + */ + return req; +} + +static void tstream_monitor_done(struct tevent_req *subreq) +{ + struct tevent_req *req = + tevent_req_callback_data(subreq, + struct tevent_req); + struct tstream_monitor_state *state = + tevent_req_data(req, + struct tstream_monitor_state); + ssize_t ret; + int sys_errno; + + state->subreq = NULL; + + ret = state->ops->monitor_recv(subreq, &sys_errno); + TALLOC_FREE(subreq); + if (ret == -1) { + tevent_req_error(req, sys_errno); + return; + } + + state->ret = ret; + + tevent_req_done(req); +} + +static void tstream_monitor_disconnect(struct tstream_context *stream, int err) +{ + if (stream == NULL) { + return; + } + + if (stream->monitor_req == NULL) { + return; + } + + /* + * tevent_req_defer_callback() was used + * so it's safe to call + */ + tevent_req_error(stream->monitor_req, err); +} + +int tstream_monitor_recv(struct tevent_req *req, int *perrno) +{ + struct tstream_monitor_state *state = + tevent_req_data(req, + struct tstream_monitor_state); + int ret; + + ret = tsocket_simple_int_recv(req, perrno); + if (ret == 0) { + ret = state->ret; + } + + tevent_req_received(req); + return ret; +} diff --git a/lib/tsocket/tsocket.h b/lib/tsocket/tsocket.h index 07e10c84a8a..deae754c28e 100644 --- a/lib/tsocket/tsocket.h +++ b/lib/tsocket/tsocket.h @@ -472,6 +472,40 @@ struct tevent_req *tstream_disconnect_send(TALLOC_CTX *mem_ctx, int tstream_disconnect_recv(struct tevent_req *req, int *perrno); +/** + * @brief Monitor the state of the stream + * + * This waits forever until a connection error happens. + * + * @param[in] mem_ctx The talloc memory context to use. + * + * @param[in] ev The tevent_context to run on. + * + * @param[in] stream The tstream context to work on. + * + * @return A 'tevent_req' handle, where the caller can register + * a callback with tevent_req_set_callback(). NULL on + * fatal error. + */ +struct tevent_req *tstream_monitor_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tstream_context *stream); + +/** + * @brief Get the result of a tstream_monitor_send(). + * + * The caller can only have one outstanding tstream_monitor_send() + * at a time otherwise the caller will get *perrno = EBUSY. + * + * @param[in] req The tevent request from tstream_readv_send(). + * + * @param[out] perrno The error number. + * + * @return -1 with perrno set to the actual errno. + * (0 is never returned!). + */ +int tstream_monitor_recv(struct tevent_req *req, int *perrno); + /** * @} */ diff --git a/lib/tsocket/tsocket_internal.h b/lib/tsocket/tsocket_internal.h index 154b2ce6f89..d334c99992c 100644 --- a/lib/tsocket/tsocket_internal.h +++ b/lib/tsocket/tsocket_internal.h @@ -122,6 +122,14 @@ struct tstream_context_ops { struct tstream_context *stream); int (*disconnect_recv)(struct tevent_req *req, int *perrno); + + /* + * Optional + */ + struct tevent_req *(*monitor_send)(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct tstream_context *stream); + int (*monitor_recv)(struct tevent_req *req, int *perrno); }; struct tstream_context *_tstream_context_create(TALLOC_CTX *mem_ctx,